Flask
- 一個短小精悍、可擴展的一個Web框架
不少可用的第三方組件:http://flask.pocoo.org/extensions/
blogs:https://www.cnblogs.com/wupeiqi/articles/7552008.html
- 依賴於wsgi[Werkzurg] (django 也依賴 Tornado 貌似不依賴)
[什麼是wsgi?]
百度百科:https://baike.baidu.com/item/wsgi/3381529?fr=aladdin
WSGI接口:https://www.liaoxuefeng.com/wiki/001374738125095c955c1e6d8bb493182103fac9270762a000/001386832689740b04430a98f614b6da89da2157ea3efe2000
安裝: pip install flask
from flask import Flask
app=Flask(__name__)#給對象起個名字
print(__name__)
app.secret_key="用session以前須要加鹽,這裏的session其實就是存放在cookies 中的"
[ 關於__name__
一、這個系統變量顯示了當前模塊執行過程當中的名稱,若是當前程序運行在這個模塊中, 的名稱就是若是不是,則爲這個模塊的名稱。
二、通常做爲函數的入口,相似於C語言,尤爲在大型工程中,經常有來代表整個工程開始運行的入口。 本文來自 IAMoldpan 的CSDN 博客 ,全文地址請點擊:https://blog.csdn.net/iamoldpan/article/details/78077983?utm_source=copy ]
@app.route('/home',methods=["GET","POST"])#默認開啓get,post 沒有開啓
def home():
return '你妹'
if __name__ == '__main__':
app.run()
class Flask:__name____name____main____main__if __name__ == "__main__":
def __init__(
self,
import_name,
static_url_path=None, #靜態文件夾前綴 別名 <img src="/static/mr.jpg">
修改static_url_path='ccc' <img src="/cccc/.jpg" > static_folder='static'不變
static_folder='static', #靜態文件默認地址
static_host=None,
host_matching=False,
subdomain_matching=False,
template_folder='templates',#模板文件 默認存放地址
instance_path=None,
instance_relative_config=False,
root_path=None
):
配置文件:
路由系統:
視 圖:
請求相關:
響 應:
模板渲染:
session:
閃 現:
中 間件:
藍 圖:(blueprint)目錄結構劃分
特殊的裝飾器:相似於django的中間件
app.secret_key="用session以前須要加鹽,這裏的session其實就是存放在cookies 中的"
dir() 函數不帶參數時,返回當前範圍內的變量、方法和定義的類型列表;
帶參數時,返回參數的屬性、方法列表。若是參數包含方法__dir__(),該方法將被調用。
若是參數不包含__dir__(),該方法將最大限度地收集參數信息。
getattr() 函數用於返回一個對象屬性值。
import importlib
path="settings.Foo"
p,c = path.rsplit('.',maxsplit=1)
m = importlib.import_module(p)
cls = getattr(m,c)
https://www.cnblogs.com/yanxiatingyu/p/9851249.html
#如何找到這個類? #
for key in dir(cls):
if key.isupper():
print(key,getattr(cls,key))
from datetime import timedelta
app.config
{'APPLICATION_ROOT': '/',
'DEBUG': False,
'ENV': 'production',
'EXPLAIN_TEMPLATE_LOADING': False,
'JSONIFY_MIMETYPE': 'application/json',
'JSONIFY_PRETTYPRINT_REGULAR': False,
'JSON_AS_ASCII': True,
'JSON_SORT_KEYS': True,
'MAX_CONTENT_LENGTH': None,
'MAX_COOKIE_SIZE': 4093,
'PERMANENT_SESSION_LIFETIME': datetime.timedelta(31), #session 最大的保留時間 print(datetime.timedelta(31))>>31 days, 0:00:00
'PREFERRED_URL_SCHEME': 'http',
'PRESERVE_CONTEXT_ON_EXCEPTION': None,
'PROPAGATE_EXCEPTIONS': None,
'SECRET_KEY': None,
'SEND_FILE_MAX_AGE_DEFAULT': datetime.timedelta(0, 43200),
'SERVER_NAME': None,
'SESSION_COOKIE_DOMAIN': None, #域名
'SESSION_COOKIE_HTTPONLY': True,
'SESSION_COOKIE_NAME': 'session', #session cookies中session的名稱
'SESSION_COOKIE_PATH': None, #路徑
'SESSION_COOKIE_SAMESITE': None, #
'SESSION_COOKIE_SECURE': False, #安全性
'SESSION_REFRESH_EACH_REQUEST': True, #最後一次訪問 模式
'TEMPLATES_AUTO_RELOAD': None,
'TESTING': False,
'TRAP_BAD_REQUEST_ERRORS': None,
'TRAP_HTTP_EXCEPTIONS': False,
'USE_X_SENDFILE': False}javascript
settings.pycss
class Config(object):html
DEBUG = False前端
TESTING = Falsejava
DATABASE_URI = 'sqlite://memory:' ????????????python
class ProductionConfig(Config):mysql
DEBUG = Trueweb
class TestingConfig(Config):正則表達式
TESTING =Truesql
方式一
app.config.from_object("settings.ProductionConfig")#正式版 每一個類配置各有不一樣, 這樣能夠來回切換
app.config.from_object("settings.DevelopmentConfig")#開發版
app.config.from_object("settings.TestingConfig")#測試版
經過在類裏面定義屬性的方式
方式二
app.config['DEBUG']=True
app.config.from_object('settings.class_name')
"""Updates the values from the given object. An object can be of one
of the following two types:
- a string: in this case the object with that name will be imported
- an actual object reference: that object is used directly
Objects are usually either modules or classes. :meth:`from_object`
loads only the uppercase attributes of the module/class. A ``dict``
object will not work with :meth:`from_object` because the keys of a
``dict`` are not attributes of the ``dict`` class.
Example of module-based configuration::
app.config.from_object('yourapplication.default_config')
from yourapplication import default_config
app.config.from_object(default_config)
You should not use this function to load the actual configuration but
rather configuration defaults. The actual config should be loaded
with :meth:`from_pyfile` and ideally from a location not within the
package because the package might be installed system wide.
See :ref:`config-dev-prod` for an example of class-based configuration
using :meth:`from_object`.
:param obj: an import name or object
"""
def from_object(self, obj):
if isinstance(obj, string_types):
obj = import_string(obj)
for key in dir(obj):
if key.isupper():
self[key] = getattr(obj, key)
[
isinstance() 函數來判斷一個對象是不是一個已知的類型,相似 type()。
isinstance() 與 type() 區別:
type() 不會認爲子類是一種父類類型,不考慮繼承關係。
isinstance() 會認爲子類是一種父類類型,考慮繼承關係。
若是要判斷兩個類型是否相同推薦使用 isinstance()。
]
路由系統
-endpoint,反向生成url,默認函數名[相似與django裏面的name,根據名字反向解析]
-url_for('endpoint') 能夠直接在另外一個視圖函數中直接url_for(endpoint_name) 進行反向解析
from flask import url_for
@app.route('/index',methods=['GET','POST'],endpoint='n1')
def index():
print(url_for('n1'))
return 'index'
動態路由
@app.route('/index/<int:nid>',methods=['GET','POST'])
def index(nid):
url_for
return '那你妹'
經常使用路由系統有以上五種,全部的路由系統都是基於一下對應關係來處理:
7種轉換器
DEFAULT_CONVERTERS
=
{
'default'
: UnicodeConverter,
'string'
: UnicodeConverter,
'any'
: AnyConverter,
'path'
: PathConverter,
'int'
: IntegerConverter,
'float'
: FloatConverter,
'uuid'
: UUIDConverter,
}
定義endpoint=name 直接url_for(name)
不定義 默認endpoint=函數名 直接url_for(function_name)
視圖:
FBV
CBV
請求相關數據:
# 請求相關信息
# request.method #當前請求方式
# request.args #GET
# request.form #POST
# request.values
# request.cookies
# request.headers
# request.path #當前訪問路徑 ???
# request.full_path
# request.script_root
# request.url
# request.base_url
# request.url_root
# request.host_url
# request.host
# request.files #ImmutableMultiDict([('upload_file', <FileStorage: 'QQ圖片20180911115431.png' ('image/png')>)])
# obj = request.files['the_file_name'] #<FileStorage: 'QQ圖片20180911115431.png' ('image/png')>
# obj.save('/var/www/uploads/' + secure_filename(f.filename))
//判斷 請求 類型
if request.method == 'GET':
print('GET')
else:
print('POST')
響應相關
form flask import jsonify #內部幫你序列化(相似與django裏面的jsonResponse)
import json
返回值:
返回的都是響應體:
return "你妹"
return json.dumps(dic)
return render_template('index.html')
return redirect('')
如何 在定製響應頭呢?
設置cookies
from flask import make_response
obj= make_response(jsonify({"test":'消息'}))
obj.headers['響應頭內容']='哈哈哈'
obj.set_cookie('key','value')
reurn obj
class BaseRequest(object): """Very basic request object. This does not implement advanced stuff like entity tag parsing or cache controls. The request object is created with the WSGI environment as first argument and will add itself to the WSGI environment as ``'werkzeug.request'`` unless it's created with `populate_request` set to False. There are a couple of mixins available that add additional functionality to the request object, there is also a class called `Request` which subclasses `BaseRequest` and all the important mixins. It's a good idea to create a custom subclass of the :class:`BaseRequest` and add missing functionality either via mixins or direct implementation. Here an example for such subclasses:: from werkzeug.wrappers import BaseRequest, ETagRequestMixin class Request(BaseRequest, ETagRequestMixin): pass Request objects are **read only**. As of 0.5 modifications are not allowed in any place. Unlike the lower level parsing functions the request object will use immutable objects everywhere possible. Per default the request object will assume all the text data is `utf-8` encoded. Please refer to `the unicode chapter <unicode.txt>`_ for more details about customizing the behavior. Per default the request object will be added to the WSGI environment as `werkzeug.request` to support the debugging system. If you don't want that, set `populate_request` to `False`. If `shallow` is `True` the environment is initialized as shallow object around the environ. Every operation that would modify the environ in any way (such as consuming form data) raises an exception unless the `shallow` attribute is explicitly set to `False`. This is useful for middlewares where you don't want to consume the form data by accident. A shallow request is not populated to the WSGI environment. .. versionchanged:: 0.5 read-only mode was enforced by using immutables classes for all data. """ #: the charset for the request, defaults to utf-8 charset = 'utf-8' #: the error handling procedure for errors, defaults to 'replace' encoding_errors = 'replace' #: the maximum content length. This is forwarded to the form data #: parsing function (:func:`parse_form_data`). When set and the #: :attr:`form` or :attr:`files` attribute is accessed and the #: parsing fails because more than the specified value is transmitted #: a :exc:`~werkzeug.exceptions.RequestEntityTooLarge` exception is raised. #: #: Have a look at :ref:`dealing-with-request-data` for more details. #: #: .. versionadded:: 0.5 max_content_length = None #: the maximum form field size. This is forwarded to the form data #: parsing function (:func:`parse_form_data`). When set and the #: :attr:`form` or :attr:`files` attribute is accessed and the #: data in memory for post data is longer than the specified value a #: :exc:`~werkzeug.exceptions.RequestEntityTooLarge` exception is raised. #: #: Have a look at :ref:`dealing-with-request-data` for more details. #: #: .. versionadded:: 0.5 max_form_memory_size = None #: the class to use for `args` and `form`. The default is an #: :class:`~werkzeug.datastructures.ImmutableMultiDict` which supports #: multiple values per key. alternatively it makes sense to use an #: :class:`~werkzeug.datastructures.ImmutableOrderedMultiDict` which #: preserves order or a :class:`~werkzeug.datastructures.ImmutableDict` #: which is the fastest but only remembers the last key. It is also #: possible to use mutable structures, but this is not recommended. #: #: .. versionadded:: 0.6 parameter_storage_class = ImmutableMultiDict #: the type to be used for list values from the incoming WSGI environment. #: By default an :class:`~werkzeug.datastructures.ImmutableList` is used #: (for example for :attr:`access_list`). #: #: .. versionadded:: 0.6 list_storage_class = ImmutableList #: the type to be used for dict values from the incoming WSGI environment. #: By default an #: :class:`~werkzeug.datastructures.ImmutableTypeConversionDict` is used #: (for example for :attr:`cookies`). #: #: .. versionadded:: 0.6 dict_storage_class = ImmutableTypeConversionDict #: The form data parser that shoud be used. Can be replaced to customize #: the form date parsing. form_data_parser_class = FormDataParser #: Optionally a list of hosts that is trusted by this request. By default #: all hosts are trusted which means that whatever the client sends the #: host is will be accepted. #: #: This is the recommended setup as a webserver should manually be set up #: to only route correct hosts to the application, and remove the #: `X-Forwarded-Host` header if it is not being used (see #: :func:`werkzeug.wsgi.get_host`). #: #: .. versionadded:: 0.9 trusted_hosts = None #: Indicates whether the data descriptor should be allowed to read and #: buffer up the input stream. By default it's enabled. #: #: .. versionadded:: 0.9 disable_data_descriptor = False def __init__(self, environ, populate_request=True, shallow=False): self.environ = environ if populate_request and not shallow: self.environ['werkzeug.request'] = self self.shallow = shallow def __repr__(self): # make sure the __repr__ even works if the request was created # from an invalid WSGI environment. If we display the request # in a debug session we don't want the repr to blow up. args = [] try: args.append("'%s'" % to_native(self.url, self.url_charset)) args.append('[%s]' % self.method) except Exception: args.append('(invalid WSGI environ)') return '<%s %s>' % ( self.__class__.__name__, ' '.join(args) ) @property def url_charset(self): """The charset that is assumed for URLs. Defaults to the value of :attr:`charset`. .. versionadded:: 0.6 """ return self.charset @classmethod def from_values(cls, *args, **kwargs): """Create a new request object based on the values provided. If environ is given missing values are filled from there. This method is useful for small scripts when you need to simulate a request from an URL. Do not use this method for unittesting, there is a full featured client object (:class:`Client`) that allows to create multipart requests, support for cookies etc. This accepts the same options as the :class:`~werkzeug.test.EnvironBuilder`. .. versionchanged:: 0.5 This method now accepts the same arguments as :class:`~werkzeug.test.EnvironBuilder`. Because of this the `environ` parameter is now called `environ_overrides`. :return: request object """ from werkzeug.test import EnvironBuilder charset = kwargs.pop('charset', cls.charset) kwargs['charset'] = charset builder = EnvironBuilder(*args, **kwargs) try: return builder.get_request(cls) finally: builder.close() @classmethod def application(cls, f): """Decorate a function as responder that accepts the request as first argument. This works like the :func:`responder` decorator but the function is passed the request object as first argument and the request object will be closed automatically:: @Request.application def my_wsgi_app(request): return Response('Hello World!') As of Werkzeug 0.14 HTTP exceptions are automatically caught and converted to responses instead of failing. :param f: the WSGI callable to decorate :return: a new WSGI callable """ #: return a callable that wraps the -2nd argument with the request #: and calls the function with all the arguments up to that one and #: the request. The return value is then called with the latest #: two arguments. This makes it possible to use this decorator for #: both methods and standalone WSGI functions. from werkzeug.exceptions import HTTPException def application(*args): request = cls(args[-2]) with request: try: resp = f(*args[:-2] + (request,)) except HTTPException as e: resp = e.get_response(args[-2]) return resp(*args[-2:]) return update_wrapper(application, f) def _get_file_stream(self, total_content_length, content_type, filename=None, content_length=None): """Called to get a stream for the file upload. This must provide a file-like class with `read()`, `readline()` and `seek()` methods that is both writeable and readable. The default implementation returns a temporary file if the total content length is higher than 500KB. Because many browsers do not provide a content length for the files only the total content length matters. :param total_content_length: the total content length of all the data in the request combined. This value is guaranteed to be there. :param content_type: the mimetype of the uploaded file. :param filename: the filename of the uploaded file. May be `None`. :param content_length: the length of this file. This value is usually not provided because webbrowsers do not provide this value. """ return default_stream_factory( total_content_length=total_content_length, content_type=content_type, filename=filename, content_length=content_length) @property def want_form_data_parsed(self): """Returns True if the request method carries content. As of Werkzeug 0.9 this will be the case if a content type is transmitted. .. versionadded:: 0.8 """ return bool(self.environ.get('CONTENT_TYPE')) def make_form_data_parser(self): """Creates the form data parser. Instantiates the :attr:`form_data_parser_class` with some parameters. .. versionadded:: 0.8 """ return self.form_data_parser_class(self._get_file_stream, self.charset, self.encoding_errors, self.max_form_memory_size, self.max_content_length, self.parameter_storage_class) def _load_form_data(self): """Method used internally to retrieve submitted data. After calling this sets `form` and `files` on the request object to multi dicts filled with the incoming form data. As a matter of fact the input stream will be empty afterwards. You can also call this method to force the parsing of the form data. .. versionadded:: 0.8 """ # abort early if we have already consumed the stream if 'form' in self.__dict__: return _assert_not_shallow(self) if self.want_form_data_parsed: content_type = self.environ.get('CONTENT_TYPE', '') content_length = get_content_length(self.environ) mimetype, options = parse_options_header(content_type) parser = self.make_form_data_parser() data = parser.parse(self._get_stream_for_parsing(), mimetype, content_length, options) else: data = (self.stream, self.parameter_storage_class(), self.parameter_storage_class()) # inject the values into the instance dict so that we bypass # our cached_property non-data descriptor. d = self.__dict__ d['stream'], d['form'], d['files'] = data def _get_stream_for_parsing(self): """This is the same as accessing :attr:`stream` with the difference that if it finds cached data from calling :meth:`get_data` first it will create a new stream out of the cached data. .. versionadded:: 0.9.3 """ cached_data = getattr(self, '_cached_data', None) if cached_data is not None: return BytesIO(cached_data) return self.stream def close(self): """Closes associated resources of this request object. This closes all file handles explicitly. You can also use the request object in a with statement which will automatically close it. .. versionadded:: 0.9 """ files = self.__dict__.get('files') for key, value in iter_multi_items(files or ()): value.close() def __enter__(self): return self def __exit__(self, exc_type, exc_value, tb): self.close() @cached_property def stream(self): """ If the incoming form data was not encoded with a known mimetype the data is stored unmodified in this stream for consumption. Most of the time it is a better idea to use :attr:`data` which will give you that data as a string. The stream only returns the data once. Unlike :attr:`input_stream` this stream is properly guarded that you can't accidentally read past the length of the input. Werkzeug will internally always refer to this stream to read data which makes it possible to wrap this object with a stream that does filtering. .. versionchanged:: 0.9 This stream is now always available but might be consumed by the form parser later on. Previously the stream was only set if no parsing happened. """ _assert_not_shallow(self) return get_input_stream(self.environ) input_stream = environ_property('wsgi.input', """ The WSGI input stream. In general it's a bad idea to use this one because you can easily read past the boundary. Use the :attr:`stream` instead. """) @cached_property def args(self): """The parsed URL parameters (the part in the URL after the question mark). By default an :class:`~werkzeug.datastructures.ImmutableMultiDict` is returned from this function. This can be changed by setting :attr:`parameter_storage_class` to a different type. This might be necessary if the order of the form data is important. """ return url_decode(wsgi_get_bytes(self.environ.get('QUERY_STRING', '')), self.url_charset, errors=self.encoding_errors, cls=self.parameter_storage_class) @cached_property def data(self): """ Contains the incoming request data as string in case it came with a mimetype Werkzeug does not handle. """ if self.disable_data_descriptor: raise AttributeError('data descriptor is disabled') # XXX: this should eventually be deprecated. # We trigger form data parsing first which means that the descriptor # will not cache the data that would otherwise be .form or .files # data. This restores the behavior that was there in Werkzeug # before 0.9. New code should use :meth:`get_data` explicitly as # this will make behavior explicit. return self.get_data(parse_form_data=True) def get_data(self, cache=True, as_text=False, parse_form_data=False): """This reads the buffered incoming data from the client into one bytestring. By default this is cached but that behavior can be changed by setting `cache` to `False`. Usually it's a bad idea to call this method without checking the content length first as a client could send dozens of megabytes or more to cause memory problems on the server. Note that if the form data was already parsed this method will not return anything as form data parsing does not cache the data like this method does. To implicitly invoke form data parsing function set `parse_form_data` to `True`. When this is done the return value of this method will be an empty string if the form parser handles the data. This generally is not necessary as if the whole data is cached (which is the default) the form parser will used the cached data to parse the form data. Please be generally aware of checking the content length first in any case before calling this method to avoid exhausting server memory. If `as_text` is set to `True` the return value will be a decoded unicode string. .. versionadded:: 0.9 """ rv = getattr(self, '_cached_data', None) if rv is None: if parse_form_data: self._load_form_data() rv = self.stream.read() if cache: self._cached_data = rv if as_text: rv = rv.decode(self.charset, self.encoding_errors) return rv @cached_property def form(self): """The form parameters. By default an :class:`~werkzeug.datastructures.ImmutableMultiDict` is returned from this function. This can be changed by setting :attr:`parameter_storage_class` to a different type. This might be necessary if the order of the form data is important. Please keep in mind that file uploads will not end up here, but instead in the :attr:`files` attribute. .. versionchanged:: 0.9 Previous to Werkzeug 0.9 this would only contain form data for POST and PUT requests. """ self._load_form_data() return self.form @cached_property def values(self): """A :class:`werkzeug.datastructures.CombinedMultiDict` that combines :attr:`args` and :attr:`form`.""" args = [] for d in self.args, self.form: if not isinstance(d, MultiDict): d = MultiDict(d) args.append(d) return CombinedMultiDict(args) @cached_property def files(self): """:class:`~werkzeug.datastructures.MultiDict` object containing all uploaded files. Each key in :attr:`files` is the name from the ``<input type="file" name="">``. Each value in :attr:`files` is a Werkzeug :class:`~werkzeug.datastructures.FileStorage` object. It basically behaves like a standard file object you know from Python, with the difference that it also has a :meth:`~werkzeug.datastructures.FileStorage.save` function that can store the file on the filesystem. Note that :attr:`files` will only contain data if the request method was POST, PUT or PATCH and the ``<form>`` that posted to the request had ``enctype="multipart/form-data"``. It will be empty otherwise. See the :class:`~werkzeug.datastructures.MultiDict` / :class:`~werkzeug.datastructures.FileStorage` documentation for more details about the used data structure. """ self._load_form_data() return self.files @cached_property def cookies(self): """A :class:`dict` with the contents of all cookies transmitted with the request.""" return parse_cookie(self.environ, self.charset, self.encoding_errors, cls=self.dict_storage_class) @cached_property def headers(self): """The headers from the WSGI environ as immutable :class:`~werkzeug.datastructures.EnvironHeaders`. """ return EnvironHeaders(self.environ) @cached_property def path(self): """Requested path as unicode. This works a bit like the regular path info in the WSGI environment but will always include a leading slash, even if the URL root is accessed. """ raw_path = wsgi_decoding_dance(self.environ.get('PATH_INFO') or '', self.charset, self.encoding_errors) return '/' + raw_path.lstrip('/') @cached_property def full_path(self): """Requested path as unicode, including the query string.""" return self.path + u'?' + to_unicode(self.query_string, self.url_charset) @cached_property def script_root(self): """The root path of the script without the trailing slash.""" raw_path = wsgi_decoding_dance(self.environ.get('SCRIPT_NAME') or '', self.charset, self.encoding_errors) return raw_path.rstrip('/') @cached_property def url(self): """The reconstructed current URL as IRI. See also: :attr:`trusted_hosts`. """ return get_current_url(self.environ, trusted_hosts=self.trusted_hosts) @cached_property def base_url(self): """Like :attr:`url` but without the querystring See also: :attr:`trusted_hosts`. """ return get_current_url(self.environ, strip_querystring=True, trusted_hosts=self.trusted_hosts) @cached_property def url_root(self): """The full URL root (with hostname), this is the application root as IRI. See also: :attr:`trusted_hosts`. """ return get_current_url(self.environ, True, trusted_hosts=self.trusted_hosts) @cached_property def host_url(self): """Just the host with scheme as IRI. See also: :attr:`trusted_hosts`. """ return get_current_url(self.environ, host_only=True, trusted_hosts=self.trusted_hosts) @cached_property def host(self): """Just the host including the port if available. See also: :attr:`trusted_hosts`. """ return get_host(self.environ, trusted_hosts=self.trusted_hosts) query_string = environ_property( 'QUERY_STRING', '', read_only=True, load_func=wsgi_get_bytes, doc='The URL parameters as raw bytestring.') method = environ_property( 'REQUEST_METHOD', 'GET', read_only=True, load_func=lambda x: x.upper(), doc="The request method. (For example ``'GET'`` or ``'POST'``).") @cached_property def access_route(self): """If a forwarded header exists this is a list of all ip addresses from the client ip to the last proxy server. """ if 'HTTP_X_FORWARDED_FOR' in self.environ: addr = self.environ['HTTP_X_FORWARDED_FOR'].split(',') return self.list_storage_class([x.strip() for x in addr]) elif 'REMOTE_ADDR' in self.environ: return self.list_storage_class([self.environ['REMOTE_ADDR']]) return self.list_storage_class() @property def remote_addr(self): """The remote address of the client.""" return self.environ.get('REMOTE_ADDR') remote_user = environ_property('REMOTE_USER', doc=''' If the server supports user authentication, and the script is protected, this attribute contains the username the user has authenticated as.''') scheme = environ_property('wsgi.url_scheme', doc=''' URL scheme (http or https). .. versionadded:: 0.7''') @property def is_xhr(self): """True if the request was triggered via a JavaScript XMLHttpRequest. This only works with libraries that support the ``X-Requested-With`` header and set it to "XMLHttpRequest". Libraries that do that are prototype, jQuery and Mochikit and probably some more. .. deprecated:: 0.13 ``X-Requested-With`` is not standard and is unreliable. """ warn(DeprecationWarning( 'Request.is_xhr is deprecated. Given that the X-Requested-With ' 'header is not a part of any spec, it is not reliable' ), stacklevel=2) return self.environ.get( 'HTTP_X_REQUESTED_WITH', '' ).lower() == 'xmlhttprequest' is_secure = property(lambda x: x.environ['wsgi.url_scheme'] == 'https', doc='`True` if the request is secure.') is_multithread = environ_property('wsgi.multithread', doc=''' boolean that is `True` if the application is served by a multithreaded WSGI server.''') is_multiprocess = environ_property('wsgi.multiprocess', doc=''' boolean that is `True` if the application is served by a WSGI server that spawns multiple processes.''') is_run_once = environ_property('wsgi.run_once', doc=''' boolean that is `True` if the application will be executed only once in a process lifetime. This is the case for CGI for example, but it's not guaranteed that the execution only happens one time.''')
class FieldFile(File): def __init__(self, instance, field, name): super().__init__(None, name) self.instance = instance self.field = field self.storage = field.storage self._committed = True def __eq__(self, other): # Older code may be expecting FileField values to be simple strings. # By overriding the == operator, it can remain backwards compatibility. if hasattr(other, 'name'): return self.name == other.name return self.name == other def __hash__(self): return hash(self.name) # The standard File contains most of the necessary properties, but # FieldFiles can be instantiated without a name, so that needs to # be checked for here. def _require_file(self): if not self: raise ValueError("The '%s' attribute has no file associated with it." % self.field.name) def _get_file(self): self._require_file() if not hasattr(self, '_file') or self._file is None: self._file = self.storage.open(self.name, 'rb') return self._file def _set_file(self, file): self._file = file def _del_file(self): del self._file file = property(_get_file, _set_file, _del_file) @property def path(self): self._require_file() return self.storage.path(self.name) @property def url(self): self._require_file() return self.storage.url(self.name) @property def size(self): self._require_file() if not self._committed: return self.file.size return self.storage.size(self.name) def open(self, mode='rb'): self._require_file() if hasattr(self, '_file') and self._file is not None: self.file.open(mode) else: self.file = self.storage.open(self.name, mode) return self # open() doesn't alter the file's contents, but it does reset the pointer open.alters_data = True # In addition to the standard File API, FieldFiles have extra methods # to further manipulate the underlying file, as well as update the # associated model instance. def save(self, name, content, save=True): name = self.field.generate_filename(self.instance, name) self.name = self.storage.save(name, content, max_length=self.field.max_length) setattr(self.instance, self.field.name, self.name) self._committed = True # Save the object because it has changed, unless save is False if save: self.instance.save() save.alters_data = True def delete(self, save=True): if not self: return # Only close the file if it's already open, which we know by the # presence of self._file if hasattr(self, '_file'): self.close() del self.file self.storage.delete(self.name) self.name = None setattr(self.instance, self.field.name, self.name) self._committed = False if save: self.instance.save() delete.alters_data = True @property def closed(self): file = getattr(self, '_file', None) return file is None or file.closed def close(self): file = getattr(self, '_file', None) if file is not None: file.close() def __getstate__(self): # FieldFile needs access to its associated model field and an instance # it's attached to in order to work properly, but the only necessary # data to be pickled is the file's name itself. Everything else will # be restored later, by FileDescriptor below. return {'name': self.name, 'closed': False, '_committed': True, '_file': None}
class File(FileProxyMixin): DEFAULT_CHUNK_SIZE = 64 * 2 ** 10 def __init__(self, file, name=None): self.file = file if name is None: name = getattr(file, 'name', None) self.name = name if hasattr(file, 'mode'): self.mode = file.mode def __str__(self): return self.name or '' def __repr__(self): return "<%s: %s>" % (self.__class__.__name__, self or "None") def __bool__(self): return bool(self.name) def __len__(self): return self.size def _get_size_from_underlying_file(self): if hasattr(self.file, 'size'): return self.file.size if hasattr(self.file, 'name'): try: return os.path.getsize(self.file.name) except (OSError, TypeError): pass if hasattr(self.file, 'tell') and hasattr(self.file, 'seek'): pos = self.file.tell() self.file.seek(0, os.SEEK_END) size = self.file.tell() self.file.seek(pos) return size raise AttributeError("Unable to determine the file's size.") def _get_size(self): if hasattr(self, '_size'): return self._size self._size = self._get_size_from_underlying_file() return self._size def _set_size(self, size): self._size = size size = property(_get_size, _set_size) def chunks(self, chunk_size=None): """ Read the file and yield chunks of ``chunk_size`` bytes (defaults to ``UploadedFile.DEFAULT_CHUNK_SIZE``). """ if not chunk_size: chunk_size = self.DEFAULT_CHUNK_SIZE try: self.seek(0) except (AttributeError, UnsupportedOperation): pass while True: data = self.read(chunk_size) if not data: break yield data def multiple_chunks(self, chunk_size=None): """ Return ``True`` if you can expect multiple chunks. NB: If a particular file representation is in memory, subclasses should always return ``False`` -- there's no good reason to read from memory in chunks. """ if not chunk_size: chunk_size = self.DEFAULT_CHUNK_SIZE return self.size > chunk_size def __iter__(self): # Iterate over this file-like object by newlines buffer_ = None for chunk in self.chunks(): for line in chunk.splitlines(True): if buffer_: if endswith_cr(buffer_) and not equals_lf(line): # Line split after a \r newline; yield buffer_. yield buffer_ # Continue with line. else: # Line either split without a newline (line # continues after buffer_) or with \r\n # newline (line == b'\n'). line = buffer_ + line # buffer_ handled, clear it. buffer_ = None # If this is the end of a \n or \r\n line, yield. if endswith_lf(line): yield line else: buffer_ = line if buffer_ is not None: yield buffer_ def __enter__(self): return self def __exit__(self, exc_type, exc_value, tb): self.close() def open(self, mode=None): if not self.closed: self.seek(0) elif self.name and os.path.exists(self.name): self.file = open(self.name, mode or self.mode) else: raise ValueError("The file cannot be reopened.") return self def close(self): self.file.close()
1 # coding:utf-8 2 3 from flask import Flask,render_template,request,redirect,url_for 4 from werkzeug.utils import secure_filename 5 import os 6 7 app = Flask(__name__) 8 9 @app.route('/upload', methods=['POST', 'GET']) 10 def upload(): 11 if request.method == 'POST': 12 f = request.files['file'] 13 basepath = os.path.dirname(__file__) # 當前文件所在路徑 14 upload_path = os.path.join(basepath, 'static\uploads',secure_filename(f.filename)) #注意:沒有的文件夾必定要先建立,否則會提示沒有該路徑 15 f.save(upload_path) 16 return redirect(url_for('upload')) 17 return render_template('upload.html') 18 19 if __name__ == '__main__': 20 app.run(debug=True) 1 <!DOCTYPE html> 2 <html lang="en"> 3 <head> 4 <meta charset="UTF-8"> 5 <title>Title</title> 6 </head> 7 <body> 8 <h1>文件上傳示例</h1> 9 <form action="" enctype='multipart/form-data' method='POST'> 10 <input type="file" name="file"> 11 <input type="submit" value="上傳"> 12 </form> 13 </body> 14 </html>
@app.route('/uploader',method=['POST']) def uploader(): file= request.files.get('filename') file.save(os.path.join('uploader_folder'),f'{file.txt}') return jsonify({"msg":'ok'}) @app.route('/get_audio/<filename>') def get_audio(filename): file=os.path.join("audio",filename) return send_file(file)
from flask import Flask, request, render_template from geventwebsocket.websocket import WebSocket from geventwebsocket.handler import WebSocketHandler from gevent.pywsgi import WSGIServer import time import json app = Flask(__name__) @app.route('/chat') def chat(): # user_socket = request.environ.get("wsgi.websocket") # type:WebSocket # print(nickname, ":", request.environ.get("REMOTE_ADDR")) # # msg = user_socket.receive() # 接收數據 # print(f"user_name:{nickname}Say: {msg}") # # print(f"是否發送信息成功:{user_socket.send(json.dumps(str(time.time())))}") return render_template('chat.html') @app.route('/ws/<nickname>') def ws(nickname): print('訪問') user_socket = request.environ.get("wsgi.websocket") # type: WebSocket print(nickname, ":", request.environ.get("REMOTE_ADDR")) while 1: msg = user_socket.receive() # 接收數據 print(f"user_name:{nickname}Say: {msg or '無消息'}") print(msg) if msg: print(f"是否發送信息成功:{user_socket.send(json.dumps(str(time.time())))}") msg = None if __name__ == '__main__': # app.run(debug=True) http_server = WSGIServer(('localhost', 5005), application=app, handler_class=WebSocketHandler) http_server.serve_forever()
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <h1>聊天室</h1> <button type="button" onclick="init_login_chat()">登陸</button> <button type="button" onclick="send_msg()">發送</button> <script type="text/javascript"> let nickname = '我是web 網頁'; let ws = null; ws = new WebSocket("ws://localhost:5005/ws/" + nickname); ws.onopen = function () { console.log('鏈接成功'); ws.send('nlsdjflasdj'); console.log('send') }; function init_login_chat() { // ws = new WebSocket("ws://localhost:5005/ws/" + nickname); // ws.onopen = function () { // console.log('鏈接成功'); // ws.send('nlsdjflasdj'); // console.log('send') // } ws.send('a'); } // console.log(ws); function receive_msg() { ws.onmessage = function (res) { let msg = JSON.parse(res.data); console.log(msg); }; } function send_msg() { // alert('df'); ws.send(JSON.stringify('你妹看看')); } // window.onbeforeunload= function(event) { return confirm("肯定離開此頁面嗎?"); } window.onbeforeunload = function (e) { // ws.onclick = function () { // alert('退出'); // }; return alert('sdfsd'); } </script> </body> </html>
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <p><input type="text" id="nickname"> <button onclick="create_conn()">登陸聊天室</button> </p> <p><input type="text" id="content"> <button onclick="send_msg()">發送消息</button> </p> <div id="message" style="width: 300px; border:2px solid red;"> </div> </body> <script type="text/javascript"> var ws = null; var nickname = null; function create_conn() { nickname = document.getElementById("nickname").value; ws = new WebSocket("ws://192.168.15.108:9527/ws/" + nickname); ws.onopen = function () { alert("歡迎登錄九聊"); }; ws.onmessage = function (data) { var ws_msg = JSON.parse(data.data); create_chat(ws_msg); }; } function create_chat(text, self) { var ptag = document.createElement("p"); if (self == "w") { ptag.style.cssText = "text-align: right"; ptag.innerText = text.msg + ":" + text.sender; } else { ptag.style.cssText = "text-align: left"; ptag.innerText = text.sender + ":" + text.msg; } document.getElementById("message").appendChild(ptag); } function send_msg() { var msg = document.getElementById("content").value; // msg_json = JSON.stringify({"to_user":"","msg":msg}); ws.send(msg_json); create_chat({sender:nickname,msg:msg}, "w"); }; </script> </html>
@app.route('/delete/<int:nid>')
def delete(nid):
return redirect(url_for('index'))
1
from funtools improt wraps
裝飾器
def auth(func):
@functools.wraps(func) /@wraps(func) #裝飾器修復技術,修復的是文檔信息: 不加上這個 原函數內原信息就不會被複制
def inner(*args,**kwargs):
if not session.get('user'):
return redirect(url_for('login'))
return func(*args,kwargs)
return inner
2
endpoint 默認是函數名,目的是反向生成url
3 裝飾器前後順序
@app.route('login',method=["GET","POST"])
@auth
def login():
if request.method == "":
pass
return render_template('login.html',error="錯誤信息")
4 before_request
@app.before_request#特殊的裝飾器
def name():
print('每一個請求過來都須要先通過它,')#適合給大量 視圖函數 添加規則等
if request.path == '/login':#當前路徑
return None;
if session.get('user'):
return None
return redirect('/login')
from flask import Markup
方式一
input=Markup('<input type="text" />')
方式二
{{ input|safe }}
特殊的裝飾器
@app.template_global()#註冊爲全局做用
def glo(a,b):
return a+b
@app.template_filter()
def fil(a,b,c):
return a+b+c
{{ 1|fil(2,3) }}
{% extend '.html' %} 繼承
{% block content-name %}
{% endblock %}
[
宏的使用
{% macro hong_name(name,type='text',value='') %}
<h1>宏</h1>
<input type="{{ type }}" name="{{ name }} value="{{ value }}">
<input type='submit' value="submit">
{% endmacro %}
宏的調用
{{ hong_name('n1') }}
]
模板渲染
-基本數據類型,可使用python語法
-傳入函數:
- django:自動執行
- flask:不自動執行
-全局定義函數
@app.template_global()
def name(a,b):
return a+b
@app.template_filter()
def name(a,b,c):
return a+b+c
{{ a|name(b,c) }}
session
當請求剛到來,flask 讀取cookie中session對應的值,將取出來的值解密並反序列化成字典,
當請求結束時,flask 會讀取內存中字典的值,進行序列化+加密,寫入到用戶的cookies中
閃現
from flask import get_flashed_messages
flask 和 特殊裝飾器
flask('臨時存儲數據') #存入臨時保存的值
print(get_flashed_messages())#取出上面的值,貌似源碼 就是pop了一下
flask('臨時存儲數據','error')
flask('sdfsdsdfsd','info')
#根據分類來取值,
get_flashed_messages(category_filter=['error'])# 取出error type的信息 info就不取了
閃現:在session中存儲一個數據,讀取時經過pop將數據移除。
from flask import get_flashed_messages
中間件
-- call 方法何時觸發?
用戶發起請求是,才執行
-- 任務:在執行call方法以前,作一個操做,call 方法執行以後作一個操做
class Middleware(object):
def __init__(self,old):
self.old=old
def __call__(self,*args,**kwargs):
ret = self.old(*args,**kwargs)
return ret
#django 和 flask 的中間件 是有的不一樣的 有什麼不一樣?
# 什麼是flask 的中間件? 做用是什麼 ?
if __name__== "__main__":
app.wsgi_app = Middleware(app.wsgi_app)
==
app.__call__ = Middleware(app.__call__)
app.run()
特殊的裝飾器
before_request
after_request
template_global
template_filter
before_first_request
errorhandler
@app.errorhandler(404)
def not_found(arg):
print(arg)
return '沒找到'
@app.before_request
def x1():
print('before')
@app.after_request
def x2(response):
print('after')
return reponse
before :沒有參數、沒有返回值
after :有參數,有返回值
執行順序 before ---> view_function ----> after
before 先定義 先執行
after 先reverse 了 一下,你懂得,因此後定義先執行
@app.before_first_request
def one():
print('我只執行一次!')
from flask import Flask,request
request :
request.args get[url地址當中的參數]
request.form post[post 請求體中的FormData中的數據]
request.values 獲取全部參數(url,formdata)
request.data
request.json
request.files
{{ }} 非邏輯代碼
{% %} 邏輯代碼
from flask import Flask,render_template,redirect,send_file
app=Flask(__name__)
app.run(debug=True) 等同於app.config['DEBUG']
app.config['DEBUG']=True #環境運行 重啓代碼
app.config['TESTING']=True#日誌記錄
app=Flask(__name__,
template_folder='templates'
)
Flask __init__()
def __init__(
self,
import_name,
static_url_path=None,
static_folder='static',
static_host=None,
host_matching=False,
subdomain_matching=False,
template_folder='templates',
instance_path=None,
instance_relative_config=False,
root_path=None
):
次日
1 路由+視圖
2 session 實現原理
3 藍圖 (flask 目錄結構的劃分)
4 threading.local [上下文管理]
5 上下文管理(第一次)
django 和 flask 的區別 ?
相同點:都依賴於wsgi
不一樣點: django請求相關數據是經過參數一個一個 傳遞參數傳遞過來的
flask: 先放在某個地方之後在去值
什麼是wsgi?
web服務網管接口,就是一個協議,實現該協議的模塊:
- wsgiref
- werkzeug
實現其協議的模塊本質上就是socket服務端 用於接收用戶請求,並處理。
通常web框架基於wsgi實現,這樣實現關注點分離
from wsgiref.simple_server import make_server
def run_server(environ,start_response):
start_response('200 OK',[('Content-Type','text/html')])
return [bytes('<h1>hello,web!</h1>',encoding='utf-8'),]
if __name__=="__main__":
httpd = make_server('',8000,RunServer)
print("Serving HTTP on port 8000 。。。")
httpd.serve_forever()
02 python fullstack s9day116 內容回顧.mp4
from werkzeug.wrappers import Response
from werkzeug.serving import run_simple
class Flask(object):
def __call__(self,environ,start_response):
response = Response('hello')
return response(environ,start_response)
def run(self):
run_simple('127.0.0.1',8000,self)
app = Flask()
if __name__ == '__main__':
app.run()
"""A decorator that is used to register a view function for a
given URL rule. This does the same thing as :meth:`add_url_rule`
but is intended for decorator usage::
@app.route('/')
def index():
return 'Hello World'
For more information refer to :ref:`url-route-registrations`.
:param rule: the URL rule as string
:param endpoint: the endpoint for the registered URL rule. Flask
itself assumes the name of the view function as
endpoint
:param options: the options to be forwarded to the underlying
:class:`~werkzeug.routing.Rule` object. A change
to Werkzeug is handling of method options. methods
is a list of methods this rule should be limited
to (``GET``, ``POST`` etc.). By default a rule
just listens for ``GET`` (and implicitly ``HEAD``).
Starting with Flask 0.6, ``OPTIONS`` is implicitly
added and handled by the standard request handling.
"""
def route(self, rule, **options):
def decorator(f):
endpoint = options.pop('endpoint', None)
self.add_url_rule(rule, endpoint, f, **options)
return f
return decorator
def tiankong():
print('tiankong !')
def priName(arg):
print("priName({})".format(arg.__name__))
print("print:{}".format(arg.__name__))
priName(tiankong)
路由設置的兩種方式:
@app.route('/index')#'/index' 這裏是url 和下面的函數名是兩個概念
def index():
return "index"
def index():
return "index"
app.add_url_rule("/index",None, index)
url endpoint view_function_name
子域名
from flask import Flask,Views,url_for
app =Flask(import_name = __name__ )
app.config['SERVER_NAME'] = 'wupeiqi.com:5000'#必定要把這裏配置加上
#當用戶訪問admin.wupeiqi.com 的時候如下視圖函數被觸發 至關於二級域名
@app.route('/',subdomain="admin")
def static_index():
pass
#只要是以http://xxx.wupeiqi.com:5000/ 就能被這裏訪問
@app.route('/dynamic',subdomain="<username>")
def username_index(username):
return username+'.your~domain.tld'
from flask import Flask,views
app = Flask(__name__)
@app.route('/index',redirect_to='/new')
def index():#老系統
pass
CBV
from flask import views,
from funtools import wrapper#裝飾器修復技術
class UserView(views.MethodView):
methods = ['GET'] # 這裏設置 GET / POST 是否容許訪問限制
decorators = [wrapper,] #自動執行裏面的裝飾器
def get(self,*args,**kwargs):
return "GET"
deg post(self,*args,**kwargs):
return 'POST'
app.add_url_rule('/user',None,UserView.as_view(''))
自定義正則
from flask import Flask
app = Flask(__name__)
from werkzeug.routing import BaseConverter
class RegexConverter(BaseConverter):
''''
自定義url 匹配正則表達式
'''
def __init__(self,map,regex):
super(RegexConverter,slef).__init__(map)
self.regex = regex
.....
pass
app.url_map.converters['reg']=RegexConverter
#是註冊 仍是 添加呢?
day30 month9 am10
from flask import Flask
app = Flask(__name__)
#先執行 decorator= app.route('/index')[route 返回一個函數 decorator(view_function)]
# @decorator
@app.route('/index')
def index():
return "index"
[ 源碼
def route(self,rule,**options):
def decorator(f):
endpoint = options.pop('endpoint',None)
self.add_url_rule(rule,endpoint,f,**options)
# rule path
# endpoint 別名 用於逆向解析
# f function——view
# options
# self app= Flask(__name__)
return f
return decorator
]
# 路由系統的本質就是self.add_url_rule(rule,endpoint,f,**options)
#
#註冊路由還能夠這樣寫
def index():
return '註冊路由還能夠這樣寫'
app.add_url_rule('/index',None,index)
#django 和 flask
def index():
return '其實就是for循環'
routes = [
('/xxx',index,),
('/...',...,),
....
]
for route in routes:#不必定對
if len(route)==2:
app.add_url_rule(route[0],None,route[1])
app.app_url_rule(**route)
[ 源碼
@setupmethod
def add_url_urle(self,rule,endpoint=None,view_func=None,provide_automatic_options=None,**options):
if endpoint is None:
endpoint = _endpoint_from_func(view_func)
options['endpoint'] = endpoint
methods = options.pop('method',None)
if methods is None:
methods = getattr(view_func,'methods',None) or ('GET',)
if isinstance(methods,string_types):
raise TypeError('這個錯誤有點長')
methods = set(item.upper() for item in methods )
required_methods = set(getattr(view_func,'required_methods',()))
...
底下還有很長
def _endpoint_from_view_func(view_func):
assert view_func is not None,'expected view func if endpoint is not provided.'
return view_func.__name__#獲取函數名,view_func傳進來的是function-view
]
route 參數
from flask import Flask
app = Flask(__name__)
@app.route(rule='/index',endpoint='',methods=['GET','POST'],default={'k':'v'},)
def index(k):
return 'sample'
add_url_urle(self,rule,endpoint=None,view_func=None,provide_automatic_options=None,**options):
endpoint=None, 名稱,用於反向生成url,即url_for('名稱') methods=None, 容許請求方式,如:['GET','POST'] 默認只開啓GET,因此POST 須要手動設置 rule, URL規則 view_func, 視圖函數名稱 defaults=None, 默認值,當URL中無參數,函數須要參數時,使用defaults={'k':'value'}爲函數提供參數
strict_slashes=None, 對URL最後的 / 符號是否嚴格要求,
如:www.baidu.com/yanyi 和 www.baidu.com/yanyi/
@app.route('/index',strict_slashes=True)
僅能訪問 www.baidu.com/index
redirect_to=None 重定向指定地址
如:@app.route('/index/<int:nid>',redirect_to='/home/<nid>')
或
def func(adapter,nid):
return '/home/8989'
@app.route('/index/<int:nid>',redirct_to=func)
subdomain=None 子域名訪問
from flask import Flask,views,url_for
app =Flask(__name__)
#必須先要配置SERVER_NAME
app.config['SERVER_NAME'] = 'xiarenwang.com:8888'
#本機
#app.config['SERVER_NAME']='localhost:8080'
@app.route('/',subdomain='home')
def index():
return 'home.xiarenwang.com'
#瀏覽器輸入:http://index.localhost:8080/
@app.route('/dynamic',subdomain="<username>")
def username_index(username):
return username+'your-domain.xx'
ip 與 域名的對應關係 修改
windows 下:
path:C:\Windows\System32\drivers\etc\hosts
找到host 文件,而且添加如下映射關係
127.0.0.1 xiarenwang.com
127.0.0.1 index.xiarenwang.com
127.0.0.1 admin.xiarenwang.com
#配置好了後發現沒法訪問二級域名?
* Serving Flask app "a" (lazy loading)
* Environment: production
WARNING: Do not use the development server in a production environment.
Use a production WSGI server instead.
* Debug mode: off
CBV
class UserView(views.MethodView):
methods = ['GET','POST'] #開放GET 和 POST 請求
decorators = [wrapper,]#批量加上裝飾器
def get(self,*args,**kwargs):
return "GET"
def post(self,*args,**kwargs):
return "POST"
app.add_url_rule('/user',None,UserView.as_view('xxxx'))
[
@classmethod
def as_view(cls,name,*class_args,**class_kwargs):
def view(*args,**kwargs):
self = view.view_class(*class_args,**class_kwargs)
return self.dispatch_request(*args,**kwargs)
if cls.decorators:
view.__name__ = name
.....
....
return view
]
自定義正則
from flask import Flask
from werkzeug.routing import BaseConverter
app = Flask(__name__)
# 定製自定義轉換器類
class RegexConverter(BaseConverter):
"""
自定義URL 匹配正則表達式
"""
def __init__(self,map,regex):#map 是自動傳值的
super(RegexConverter,self).__init__(map)
self.regex = regex
#在flask 源碼裏面會自動讀取(加載穩當仍是讀取穩當?)self.regex,會根據你寫的正則,來進行匹配
def to_python(self,value):
"""
路由匹配時,匹配成功後傳遞給視圖函數中參數的值
"""
return int(value)
def to_url(self,vlaue):#反向生成url 觸發
"""
使用url_for 反向生成URL時,傳遞的參數通過該方法出來,返回的值用於生成URL中的參數
"""
app.url_map.converters['reg'] = RegexConverter
#這裏給RegexConverter 進行註冊
#在原有轉換器的基礎上添加 reg
[#添加到DEFAUTL_CONVERTERS字典中
#:the deafult converter mapping for the map.
DEFAULT_CONVERTERS = {
'':''
...
'reg':RegexConverter
}
]
@app.route('/index/<reg("\d+"):nid>')
def index(nid):
return "index"
#reg 代指的就是RegexConverter這個類。RegexConverter() 實例化
#
#流程
1. 用戶發送請求
2. falsk 內部進行正則匹配
3.調用to_python(正則匹配的結果)方法
4.to_python方法的返回值會交給視圖函數的參數
@app.route('/index/<reg("\d+"):nid>')
def index(nid):
print(nid,type(nid))
return "index"
from flask import url_for
@app.route('/index/<reg("\d+"):nid>')
def index(nid):
print(url_for('index',nid=8888))
#假如nid是8888,進行反向解析的時候,會先將nid傳給to_url,
#to_url(value) 這個value 就是nid ,return 的val 就是反向解析的結果
return "index"
if __name__ == "__main__":
app.run()
ImmutableDict() ?????
app.url_map.converters['xx']= XxConverter
converters = default_converters.copy()
default_converters = ImmutableDict(DEFAULT_CONVERTERS)
DEFAUTL_CONVERS= {
#內置轉換器
}
Session 實現原理(源碼)
from flask import Flask
#實例化對象
app = Flask(__name__)
#設置路由
app.route('/index')
def index():
return "index"
if __name__ == "__main__":
app.run()#本質就是啓動socket服務端
app.__call__ #用戶請求到達,會執行__call__
app.wsgi_app
app.request_class
app.session_interface
#設置路由本質:
app.url_map()
url_map() = Map()#是個Map對象
#保存全部路由關係對象
app.url_map=
[
('/index',index)
]
#environ 是請求相關的全部數據[由wsgi作了初步的封裝]
#start_response 用於設置響應相關數據
[
def __call__(self,environ,start_response):
return self.wsgi_app(environ,start_response)
]
__call__ 何時觸發?爲何觸發?(貌似對象執行了()會觸發)
[
def wsgi_app(self,environ,start_response):
ctx = self.request_context(environ)
#1.獲取environ後,再次將environ 進行封裝
#2.從environ中獲取名稱爲session的cookie,解密,反序列化
#3.在
error = None
try:
try:
ctx.push()
# 4. 執行視圖函數
response = self.full_dispatch_request()
except Exception as e:
error = e
response = self.handle_exception(e)
except:
error = sys.exc_info()[1]
raise
return response(environ,start_response)
finally:
if self.should_ignore_error(error):
error = None
"""
5. 獲取session,加密,序列化 寫入cookie
6. 清空數據
"""
ctx.auto_pop(error)
]
sys.exc_info() ????????
[
def request_context(self, environ):
return RequestContext(self, environ)
class RequestContext(object):
def __init__(self, app, environ, request=None):
self.app = app
if request is None:
request = app.request_class(environ)
self.request = request
self.url_adapter = app.create_url_adapter(self.request)
self.flashes = None
self.session = None
.....
if self.session is None:
session_interface = self.app.session_interface
self.session = session_interface.open_session( self.app, self.request )
if self.session is None:
self.session = session_interface.make_null_session(self.app)
#code more
from .wrapper import Request
class Flask(_PackageBoundObject):
request_class = Request
....
session_interface = SecureCookieSessionInterface()
# code more
class SecureCookieSessionInterface(SessionInterface):
pass
def open_session(self, app, request):
s = self.get_signing_serializer(app)
if s is None:
return None
val = request.cookies.get(app.session_cookie_name)
if not val:
return self.session_class()
max_age = total_seconds(app.permanent_session_lifetime)
try:
data = s.loads(val, max_age=max_age)
return self.session_class(data)
except BadSignature:
return self.session_class()
class SessionInterface(object):
pass
]
做業 流程圖????
class SessionInterface(object): """The basic interface you have to implement in order to replace the default session interface which uses werkzeug's securecookie implementation. The only methods you have to implement are :meth:`open_session` and :meth:`save_session`, the others have useful defaults which you don't need to change. The session object returned by the :meth:`open_session` method has to provide a dictionary like interface plus the properties and methods from the :class:`SessionMixin`. We recommend just subclassing a dict and adding that mixin:: class Session(dict, SessionMixin): pass If :meth:`open_session` returns ``None`` Flask will call into :meth:`make_null_session` to create a session that acts as replacement if the session support cannot work because some requirement is not fulfilled. The default :class:`NullSession` class that is created will complain that the secret key was not set. To replace the session interface on an application all you have to do is to assign :attr:`flask.Flask.session_interface`:: app = Flask(__name__) app.session_interface = MySessionInterface() .. versionadded:: 0.8 """ #: :meth:`make_null_session` will look here for the class that should #: be created when a null session is requested. Likewise the #: :meth:`is_null_session` method will perform a typecheck against #: this type. null_session_class = NullSession #: A flag that indicates if the session interface is pickle based. #: This can be used by Flask extensions to make a decision in regards #: to how to deal with the session object. #: #: .. versionadded:: 0.10 pickle_based = False def make_null_session(self, app): """Creates a null session which acts as a replacement object if the real session support could not be loaded due to a configuration error. This mainly aids the user experience because the job of the null session is to still support lookup without complaining but modifications are answered with a helpful error message of what failed. This creates an instance of :attr:`null_session_class` by default. """ return self.null_session_class() def is_null_session(self, obj): """Checks if a given object is a null session. Null sessions are not asked to be saved. This checks if the object is an instance of :attr:`null_session_class` by default. """ return isinstance(obj, self.null_session_class) def get_cookie_domain(self, app): """Returns the domain that should be set for the session cookie. Uses ``SESSION_COOKIE_DOMAIN`` if it is configured, otherwise falls back to detecting the domain based on ``SERVER_NAME``. Once detected (or if not set at all), ``SESSION_COOKIE_DOMAIN`` is updated to avoid re-running the logic. """ rv = app.config['SESSION_COOKIE_DOMAIN'] # set explicitly, or cached from SERVER_NAME detection # if False, return None if rv is not None: return rv if rv else None rv = app.config['SERVER_NAME'] # server name not set, cache False to return none next time if not rv: app.config['SESSION_COOKIE_DOMAIN'] = False return None # chop off the port which is usually not supported by browsers # remove any leading '.' since we'll add that later rv = rv.rsplit(':', 1)[0].lstrip('.') if '.' not in rv: # Chrome doesn't allow names without a '.' # this should only come up with localhost # hack around this by not setting the name, and show a warning warnings.warn( '"{rv}" is not a valid cookie domain, it must contain a ".".' ' Add an entry to your hosts file, for example' ' "{rv}.localdomain", and use that instead.'.format(rv=rv) ) app.config['SESSION_COOKIE_DOMAIN'] = False return None ip = is_ip(rv) if ip: warnings.warn( 'The session cookie domain is an IP address. This may not work' ' as intended in some browsers. Add an entry to your hosts' ' file, for example "localhost.localdomain", and use that' ' instead.' ) # if this is not an ip and app is mounted at the root, allow subdomain # matching by adding a '.' prefix if self.get_cookie_path(app) == '/' and not ip: rv = '.' + rv app.config['SESSION_COOKIE_DOMAIN'] = rv return rv def get_cookie_path(self, app): """Returns the path for which the cookie should be valid. The default implementation uses the value from the ``SESSION_COOKIE_PATH`` config var if it's set, and falls back to ``APPLICATION_ROOT`` or uses ``/`` if it's ``None``. """ return app.config['SESSION_COOKIE_PATH'] \ or app.config['APPLICATION_ROOT'] def get_cookie_httponly(self, app): """Returns True if the session cookie should be httponly. This currently just returns the value of the ``SESSION_COOKIE_HTTPONLY`` config var. """ return app.config['SESSION_COOKIE_HTTPONLY'] def get_cookie_secure(self, app): """Returns True if the cookie should be secure. This currently just returns the value of the ``SESSION_COOKIE_SECURE`` setting. """ return app.config['SESSION_COOKIE_SECURE'] def get_cookie_samesite(self, app): """Return ``'Strict'`` or ``'Lax'`` if the cookie should use the ``SameSite`` attribute. This currently just returns the value of the :data:`SESSION_COOKIE_SAMESITE` setting. """ return app.config['SESSION_COOKIE_SAMESITE'] def get_expiration_time(self, app, session): """A helper method that returns an expiration date for the session or ``None`` if the session is linked to the browser session. The default implementation returns now + the permanent session lifetime configured on the application. """ if session.permanent: return datetime.utcnow() + app.permanent_session_lifetime def should_set_cookie(self, app, session): """Used by session backends to determine if a ``Set-Cookie`` header should be set for this session cookie for this response. If the session has been modified, the cookie is set. If the session is permanent and the ``SESSION_REFRESH_EACH_REQUEST`` config is true, the cookie is always set. This check is usually skipped if the session was deleted. .. versionadded:: 0.11 """ return session.modified or ( session.permanent and app.config['SESSION_REFRESH_EACH_REQUEST'] ) def open_session(self, app, request): """This method has to be implemented and must either return ``None`` in case the loading failed because of a configuration error or an instance of a session object which implements a dictionary like interface + the methods and attributes on :class:`SessionMixin`. """ raise NotImplementedError() def save_session(self, app, session, response): """This is called for actual sessions returned by :meth:`open_session` at the end of the request. This is still called during a request context so if you absolutely need access to the request you can do that. """ raise NotImplementedError() session_json_serializer = TaggedJSONSerializer() class SecureCookieSessionInterface(SessionInterface): """The default session interface that stores sessions in signed cookies through the :mod:`itsdangerous` module. """ #: the salt that should be applied on top of the secret key for the #: signing of cookie based sessions. salt = 'cookie-session' #: the hash function to use for the signature. The default is sha1 digest_method = staticmethod(hashlib.sha1) #: the name of the itsdangerous supported key derivation. The default #: is hmac. key_derivation = 'hmac' #: A python serializer for the payload. The default is a compact #: JSON derived serializer with support for some extra Python types #: such as datetime objects or tuples. serializer = session_json_serializer session_class = SecureCookieSession def get_signing_serializer(self, app): if not app.secret_key: return None signer_kwargs = dict( key_derivation=self.key_derivation, digest_method=self.digest_method ) return URLSafeTimedSerializer(app.secret_key, salt=self.salt, serializer=self.serializer, signer_kwargs=signer_kwargs) def open_session(self, app, request): s = self.get_signing_serializer(app) if s is None: return None val = request.cookies.get(app.session_cookie_name) if not val: return self.session_class() max_age = total_seconds(app.permanent_session_lifetime) try: data = s.loads(val, max_age=max_age) return self.session_class(data) except BadSignature: return self.session_class() def save_session(self, app, session, response): domain = self.get_cookie_domain(app) path = self.get_cookie_path(app) # If the session is modified to be empty, remove the cookie. # If the session is empty, return without setting the cookie. if not session: if session.modified: response.delete_cookie( app.session_cookie_name, domain=domain, path=path ) return # Add a "Vary: Cookie" header if the session was accessed at all. if session.accessed: response.vary.add('Cookie') if not self.should_set_cookie(app, session): return httponly = self.get_cookie_httponly(app) secure = self.get_cookie_secure(app) samesite = self.get_cookie_samesite(app) expires = self.get_expiration_time(app, session) val = self.get_signing_serializer(app).dumps(dict(session)) response.set_cookie( app.session_cookie_name, val, expires=expires, httponly=httponly, domain=domain, path=path, secure=secure, samesite=samesite )
藍圖(blueprint):給目錄結構
from flask import Flask
app = Flask(__name__)
app
# 報錯 ValueError: urls must start with a leading slash
# 路由註冊時 少寫了/
@ac.route('login')
def login():
return 'login'
如下是小藍圖 例子
crm
from crm import create_app app = create_app() if __name__ == '__main__': app.run()
crm folder
from flask import Flask from .views.account import ac from .views.user import uc def create_app(): app = Flask(__name__) @app.before_request def filter(): print('過濾全局!全局方法') app.register_blueprint(ac,url_prefix='/api')#加前綴,url_prefix='/api' #瀏覽器輸入http://127.0.0.1:5000/api/login app.register_blueprint(uc) return app # 過濾全局!全局方法 # 127.0.0.1 - - [01/Oct/2018 00:20:39] "GET /list HTTP/1.1" 200 - # 過濾局部!uc方法 # 過濾全局!全局方法 # 127.0.0.1 - - [01/Oct/2018 00:20:47] "GET /api/login HTTP/1.1" 200 - # 在藍圖裏面能夠所有加特殊裝飾器,也能夠是局部的 #例子:before_request #這個 項目目錄結構 是小藍圖的例子[小型應用程序] #大藍圖 就和django 基本上同樣了[大型應用程序] #
views
from flask import Blueprint uc = Blueprint('uc',__name__) @uc.before_request def filter(): print('過濾局部!uc方法') @uc.route('/list') def list(): return 'List' @uc.route('/detail') def detail(): return 'detail'
#帳戶相關 from flask import Blueprint,render_template ac = Blueprint('ac',__name__,template_folder='log')#設置模板文件夾,優先在templates裏面找模板文件,找不到再到template_folder 設置的文件夾裏面去找 # ac = Blueprint('ac',__name__,template_folder='log',static_url_path='') #還能夠設置靜態文件 什麼鬼? 如何用的? @ac.route('/login') def login(): # return render_template('/log/login.html')#這樣能夠訪問的 return render_template('login.html') @ac.route('/logout') def logout(): return 'logout'
templates
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> 用戶登陸 </body> </html>
大藍圖
thread.local 數據隔離 防止數據混亂
import threading
from threading import local
import time
obj = local()
def task(i):
obj.xxxx = i
time.sleep(1.5)
print(obj.xxxx, i)
class o(local):
x = 0
def walk(i):
o.x = i
time.sleep(2)
print(o.x, i)
for i in range(10):
t = threading.Thread(target=task, args=(i,))
# t=threading.Thread(target=walk,args=(i,))
t.start()
# 另一種線程隔離的方式
dic = {}
def task(i): # 給每一個線程開闢獨立空間
ident = threading.get_ident() # 獲取線程id
if ident in dic:
dic[ident]['xxx'] = i
else:
dic[ident] = {'xxx': i}
import greenlet
# 爲每一個協程開闢空間,作數據隔離
def walk(i):
ident = greenlet.getcurrent() # 獲取協程的id
if ident in dic:
dic[ident]['xxx'] = i
else:
dic[ident] = {'xxx': i}
自定義Local
class Local(object):
DIC = {
}
def __getattr__(self, item):#獲取屬性值的時候被觸發
print('__getattr__',item)
ident = threading.get_ident()
if ident in self.DIC:
return self.DIC
return None
def __setattr__(self, key, value):#設置屬性值的時候被觸發
ident =threading.get_ident()
if ident in self.DIC:
self.DIC[ident][key]=value
else:
self.DIC[ident]={key:value}
經過這個自定義 Local 能夠實現相同的功能
在進行升級 改造一下
import threading
try:
import greenlet
get_ident = greenlet.getcurrent
except Exception as e:
get_ident =threading.get_ident
class Loca(object):
DIC ={}
def __getattr__(self, item):
ident = get_ident()
if ident in self.DIC:
return self.DIC[ident].get(item)
return None
def __setattr__(self, key, value):
ident = get_ident()
if ident in self.DIC:
self.DIC[ident][key] = value
else:
self.DIC[ident] ={key:value}
上下文管理流程(第一次)
請求到來的時候
內容回顧
day 117 回顧 day 116
1.django 和 flask 的區別?
不一樣點:request 操做的機制
共同點:都有視圖、路由、都是基於wsgi 協議
flask:擴建性高
2. flask 提供的組件:
- 配置文件:
- 使用
- 原理
- 路由系統:
[過裝飾器實現]
[能夠不經過裝飾器實現]
- 書寫方式
- 參數
- 重定向
- 子域名
- 路由本質 裝飾器 + 閉包 實現的
- 視圖
- CBV class _ view
- 藍圖
- 做用:
- 目錄結構的劃分
- 前綴
- 應用特殊裝飾器
- 特殊裝飾器
before_request
after_request [內部作了一個反轉]
貌似有六個
???
3. session 實現原理
???
s9day117
前戲:
-偏函數:自動幫你傳參數
import functools
def index(a1,a2):
return a1+a2
#new_func = functions.partial(index,「自動幫你傳遞的參數」)
ret = index(1,2)
print(ret)
new_func = functions.partial(index,「自動幫你傳遞的參數」)
ret = new_func(‘本身須要傳遞的參數’)
print(ret)
- 面向對象
- 執行父類的方法
class Base(object):
def func(self):
print('Base.func')
class Foo(Base):
def func(self):
#方式一:根據mro的順序執行方法
super(Foo,self).func()
#方式二:主動執行Base類的方法
Base.func(self)
print('Foo.func')
#注意super 和 Base 二者之間的區別
obj = Foo()
obj.func()
class Base(object):
def func(self):
super(Base,self).func()
print('Base.func')
class Bar(object):
def func(self):
print('Bar.func')
class Foo(Base,Bar):
pass
#sample - 1
obj = Foo()
obj.func()
print(Foo.__mro__)
#smaple - 2 #報錯
obj = Base()
obj.func()
根據smaple -1 和 sample -2 super 不是找父類的而是根據mro順序找
class Foo(object):
def __init__(self):
self.storage = {}
def __setattr__(self,key,value):
print('self.storage')
#這樣寫直接會報錯
#注意執行順序 和 調用 順序
#以及屬性的存在週期
class Foo(object):
def __init__(self):
object.__setattr__(self,'storage',{})
def __setattr__():
print(self.storage)
#這樣調用沒毛病
棧:先進先出 [相似彈夾]
class Stack(object):
def __init__(self):
self.data = []
def push(self,val):
self.data.append(val)
def pop(self):#刪除
return self.data.pop()
def top(self):#每次都取最後一個
return self.data[-1]
或
if len(data)== 0:
return None
return self.data[len(data)-1]
_statck = Stack()
_stack.push('a')
_stack.push('b')
Local 對象
""" { ident:{} } """ from threading import get_ident class Local(object):
def __init__(self): object.__setattr__(self,'storage',{}) def __setattr__(self,key,value): ident = get_ident()#獲取惟一標記 if ident not in self.storage: self.storage[ident] = { key : value } else: self.storage[ident][key] = value
def __getattr__(self,item):
ident = get_ident()
if ident in self.storage:
return returnn self.storage[ident].get(item)
#return None
try:
from greenlet import getcurrent as get_ident
except:
from threading import get_ident
from threading import get_ident
class Local(object):
def __init__(self): object.__setattr__(self,'storage',{}) def __setattr__(self,key,value): ident = get_ident()#獲取惟一標記 if ident not in self.storage: self.storage[ident] = { key : value } else: self.storage[ident][key] = value
def __getattr__(self,item):
ident = get_ident()
if ident in self.storage:
return returnn self.storage[ident].get(item)
#return None
Local 對象
from flask import globals
# context locals
_request_ctx_stack = LocalStack() #在LocalStack() 類 文件上面就是Local()
class Local(object): __slots__ = ('__storage__', '__ident_func__') def __init__(self): object.__setattr__(self, '__storage__', {}) object.__setattr__(self, '__ident_func__', get_ident) def __iter__(self): return iter(self.__storage__.items()) def __call__(self, proxy): """Create a proxy for a name.""" return LocalProxy(self, proxy) def __release_local__(self): self.__storage__.pop(self.__ident_func__(), None) def __getattr__(self, name): try: return self.__storage__[self.__ident_func__()][name] except KeyError: raise AttributeError(name) def __setattr__(self, name, value): ident = self.__ident_func__() storage = self.__storage__ try: storage[ident][name] = value except KeyError: storage[ident] = {name: value} def __delattr__(self, name): try: del self.__storage__[self.__ident_func__()][name] except KeyError: raise AttributeError(name)
LocalStack
上下文管理:request
上下文管理:session
第三方組件:flask-session
代碼統計做業
s9day118
代碼統計
pymysql
數據庫鏈接池 DBUtils
初步認識:SQLAlchemy
[flask 最有價值的就是上下文管理 ]
內容迴歸:
falsk :
路由
視圖
藍圖
上下文管理:request
__call__
請求進來執行__call__方法
爲何執行__call__方法,由於wsgi
代碼統計
[
shutil ???????
class FileStorage(object):
def save(self, dst, buffer_size=16384):
from shutil import copyfileobj
close_dst = False
if isinstance(dst, string_types):
dst = open(dst, 'wb')
close_dst = True
try:
copyfileobj(self.stream, dst, buffer_size)
finally:
if close_dst:
dst.close()
shutil.py
def copyfileobj(fsrc, fdst, length=16*1024):
"""copy data from file-like object fsrc to file-like object fdst"""
while 1:
buf = fsrc.read(length)
if not buf:
break
fdst.write(buf)
]
from flask import Flask
app = Flask(__name__)
app.config['MAX_CONTENT_LENGTH'] = 1024*1024*7 #控制文件上傳大小爲7m
#假如用戶上傳文件大小超過7m 會報413的錯誤
@index.route('/upload',methods=["GET","POST"])
def upload():
if request.method == "GET":
return render_template('upload.html')
#from werkzeug.datastructures import FileStorage
file_obj = request.fiels.get('code')
#print(type(file_obj))
#print(file_obj.filename)#文件名稱
#print(file_obj.stream)#文件內容
name_ext = file_obj.filename.rsplite('.',maxsplit=1)
if len(name_exit) != 2:#說明沒有後綴名
return '請上傳zip壓縮文件'
if name_ext[1] != "zip":
return "請上傳zip壓縮文件"
file_path = os.path.join('files',file_obj.filename)#處理保存路徑
#從file_obj.stream 中讀取內容,在寫入文件
file_obj.save(file_path) #保存文件
return 「上傳成功」
解壓zip文件
import shutil
#經過open打開壓縮文件,讀取內容在進行解壓
shutil._unpack_zipfile("須要解壓的文件路徑","解壓以後須要存放的路徑")
文件上傳與文件解壓
3接收用戶上傳文件,並解壓到指定目錄
import shutil
shutil._unpack_zipfile(file_obj.stream,'解壓目錄')
#名稱須要作惟一性處理
import uuid
target_path = os.path.join('files',uuid.uuid4())
#或着每一個文件都有惟一性的目錄
目錄的遍歷
#listdir 當前目錄全部文件夾
for item in os.listdir(target_path):
print(item)
total_num = 0 #總行數
for base_path ,folder_list,file_list in os.walk(target_path):
for file_name in file_list:
file_path = os.path.join(base_path,file_name)
#單個文件的完整路徑
file_ext = file_path.rsplit('.',maxsplit=1)
if len(file_exit) != 2:
continue
if file_ext[1] != 'py':
continue
file_num = 0
with open(file_path,'rb') as f:
for line in f:
#空格 換行
line = line.strip()#
if not line:
continue
if line.startswith(b'#'):#註釋
continue
#第三中狀況 """ 三引號
file_num += 1
print(file_num,file_path)
total_num += file_num
strip() ??????????
pymysql 操做數據庫
helper.py
import pymysql
from ..settings import Config
def connect():
"""
建立鏈接
:return:
"""
conn = Config.POOL.connection()
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
return conn,cursor
def connect_close(conn,cursor):
"""
釋放 鏈接
:param conn:
:param cursor:
:return:
"""
cursor.close()
conn.close()
def fetch_all(sql,args):
"""
獲取全部select *
:param sql:
:param args:
:return:
"""
conn,cursor = connect()
cursor.execute(sql,args)
record_list = cursor.fetchall()
connect_close(conn,cursor)
return record_list
def fetch_one(sql,args):
"""
返回符合sql的第一條數據
:param sql:
:param args:
:return:
"""
conn,cursor = connect()
cursor.execute(sql,args)
result = cursor.fetchone()
connect_close(conn,cursor)
return result
def insert(sql,args):
"""
返回受影響的行數,插入數據
:param sql:
:param args:
:return:
"""
conn,cursor = connect()
row = cursor.execute(sql,args)
conn.commit()
connect_close(conn,cursor)
return row
數據庫鏈接池***
settings.py
from DBUtils.PooledDB import PooledDB,SharedDBConnection
import pymysql
# 加鹽
CONFIG_KEYS = {
"SALE": bytes('小雞頓蘑菇', encoding='utf-8')
}
class Config(object):
SALT = bytes('小雞頓蘑菇', encoding='utf-8') #設置password keys
SECREY_KEY = "passlslk。、dflsadk fsaldk" #設置session keys
MAX_CONTENT_LENGTH = 1024*1024*10 #設置上傳最大值
POOL = PooledDB(
creator=pymysql, # 使用連接數據庫的模塊
maxconnections=6, # 鏈接池容許的最大鏈接數,0和None表示不限制鏈接數
mincached=2, # 初始化時,連接池中至少建立的空閒的連接,0表示不建立
maxcached=5, # 連接池中最多閒置的連接,0和None不限制
maxshared=3,
# 連接池中最多共享的連接數量,0和None表示所有共享。PS: 無用,由於pymysql和MySQLdb等模塊的 threadsafety都爲1,全部值不管設置爲多少,_maxcached永遠爲0,因此永遠是全部連接都共享。
blocking=True, # 鏈接池中若是沒有可用鏈接後,是否阻塞等待。True,等待;False,不等待而後報錯
maxusage=None, # 一個連接最多被重複使用的次數,None表示無限制
setsession=[], # 開始會話前執行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服務端,檢查是否服務可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='',
database='flask',
charset='utf8mb4'
)
s9day119 內容迴歸
-. django 和 falsk 的認識
-. Flask 基礎:
配置文件 app.config.form_object()#兩種方式
反射+ importlib
路由系統
裝飾器 @app.route()
參數
url
endpoint url_for 默認狀況下 等於視圖函數名
methods GET/POST [] list
等
自定義加裝飾器:
functools.wraps
endpoint 默認是函數名
寫路由的兩種方式
裝飾器:@app.route()
add_url_rule()
自定義支持正則的url
session:
實現原理:以加密的形式存到了瀏覽器的cookies裏面
藍圖:
目錄結構的劃分
前綴
特殊裝飾器:
- 上下文管理
threading.local 數
據隔離[給每一個線程開闢內存空間,使得線程之間進行數據隔離]
應用:DBUtils 中爲每一個線程建立一個數據庫鏈接時使用
面向對象的特殊方法:
getattr
setattr
delattr
偏函數
單例模式
請求上下文的流程:
源碼流程:
請求到來執行 __call__
wsgi_app()
ctx = RequestContext(): 封裝= 請求數據 + 空session
ctx.push():將ctx 傳給LocalStack對象,LocalStack再將數據傳給Local 存儲起來
問題:Lcoal 中是如何存儲?
__storage__={
ident:{}
}
問題:LocalStack 做用?
ident:{stack:[ctx]}
-. 數據庫 前端
bootstramp
什麼是響應式佈局? 根據窗口的不一樣,顯示不一樣的效果
python 裏面那些比較重要:
生成器、 ??????
迭代器、 ??????
裝飾器、??????
反射 、 ?????
上下文管理:LocalProxy
上下文管理:
請求上下文:request
App 上下文:app/g
第三方組件:wtforms
使用
原理
Local/LocaStack/LocalProxy