Python版——博客網站<一>基礎模塊與框架搭建

開源地址:https://github.com/leebingbin/Python3.WebAPP.Blog前端

 

1、搭建開發環境python

    首先,確認系統安裝的Python版本是否爲3.6.x:mysql

    在Mac上安裝Python。若是你正在使用Mac,系統是OS X 10.8~10.10,那麼系統自帶的Python版本是2.7。要安裝最新的Python 3.6,有兩個方法:android

    方法一:從Python官網下載Python 3.6的安裝程序,雙擊運行並安裝;ios

    方法二:若是安裝了Homebrew,直接經過命令brew install python3安裝便可。git

    而後,用pip安裝開發Web App須要的第三方庫:github

異步框架aiohttp:web

前端模板引擎jinja2:sql

    MySQL 5.x數據庫,從官方網站下載並安裝,安裝完畢後,請務必牢記root口令。數據庫


    MySQL的Python異步驅動程序aiomysql:

項目結構
    選擇一個工做目錄,而後,咱們創建以下的目錄結構:

blog-python3-webapp/     <-- 根目錄
|
+- backup/               <-- 備份目錄
|
+- conf/                 <-- 配置文件
|
+- dist/                 <-- 打包目錄
|
+- www/                  <-- Web目錄,存放.py文件
|  |
|  +- static/            <-- 存放靜態文件
|  |
|  +- templates/         <-- 存放模板文件
|
+- ios/                  <-- 存放iOS App工程
|
+- android/                  <-- 存放Android App工程
|
+- LICENSE               <-- 代碼LICENSE

    建立好項目的目錄結構後,同時創建git倉庫並同步至GitHub (地址:https://github.com/leebingbin/Python3.WebAPP.Blog)。

    因爲咱們的blog-python3-webapp創建在asyncio的基礎上,所以用aiohttp寫一個基本的app.py測試異步web服務環境等是否正常。
    在測試以前,須要先安裝第三方模塊asyncio。asyncio是Python 3.4版本引入的標準庫,直接內置了對異步IO的支持。python 3.4開始就支持異步IO編程,提供了asyncio庫,可是3.4中採用的是@asyncio.coroutine和yield from這和原來的generator關鍵字yield很差區分,在3.5之後,採用了async(表示協程)和await關鍵字,這樣就好區分多了。因爲Web框架使用了基於asyncio的aiohttp,這是基於協程的異步模型。在協程中,不能調用普通的同步IO操做,由於全部用戶都是由一個線程服務的,協程的執行速度必須很是快,才能處理大量用戶的請求。而耗時的IO操做不能在協程中以同步的方式調用,不然,等待一個IO操做時,系統沒法響應任何其餘用戶。
    異步編程的一個原則:一旦決定使用異步,則系統每一層都必須是異步。幸運的是aiomysql爲MySQL數據庫提供了異步IO的驅動。

    但在這以前,須要先安裝MySQL;畢竟Web App裏面有不少地方都要訪問數據庫。採用dmg的方式安裝

    一、下載完成後,雙擊打開

    二、右鍵,選打開,緊接着按照提示,傻瓜式安裝便可;可是當彈出一個MYSQL Installer提示框的時候必定打開備忘錄複製粘貼記下彈出框的密碼(這是你MySQL的root帳戶的密碼)

   三、 打開MySQL服務

    四、配置路徑
    用文本編輯器打開.bash_profile,加入PATH=$PATH:/usr/local/mysql/bin並保存。我習慣用 Atom 做爲文本編輯器,或者直接用自帶的 vim 編輯。在命令行輸入source ~/.bash_profile路徑就配置好了。

    五、登錄並修改密碼
    登陸——mysql -u root -p登錄,輸入以前保存的密碼

    修改密碼——用SET PASSWORD FOR 'root'@'localhost' = PASSWORD('root') ;

    修改密碼。若是你的版本比較新的話(好比我安裝的是5.7.x的版本),就會出現以下提示,這個時候已經更新了密碼,可是會有 warning。若是想查看warning,能夠用SHOW WARNINGS。

    六、Mac下MySQL卸載方法 (若是安裝錯或忘了密碼,還不如卸載重裝)
    mac下mysql的DMG格式安裝內有安裝文件,卻沒有卸載文件……很鬱悶的事。
    先中止全部mysql有關進程。

sudo rm /usr/local/mysql
sudo rm -rf /usr/local/mysql*
sudo rm -rf /Library/StartupItems/MySQLCOM
sudo rm -rf /Library/PreferencePanes/My*

#若是電腦上沒有這個文件,跳過這步驟就好
#若是有打開文件,去掉MYSQLCOM=-YES-保存便可
vim /etc/hostconfig  (and removed the line MYSQLCOM=-YES-)


rm -rf ~/Library/PreferencePanes/My*
sudo rm -rf /Library/Receipts/mysql*
sudo rm -rf /Library/Receipts/MySQL*
sudo rm -rf /var/db/receipts/com.mysql.*

2、用Python寫一個ORM

    搭建好環境以後,那就讓咱們用用Python寫一個ORM吧!

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

__author__ = 'libingbin2015@aliyun.com'

import asyncio, logging

import aiomysql

def log(sql, args=()):
    logging.info('SQL: %s ' % sql)

#建立鏈接池
async def create_pool(loop, **kw):
    logging.info('create database connection pool...')
    global __pool
    __pool = await aiomysql.create_pool(
        host = kw.get('host', 'localhost'),
        port = kw.get('port', 3306),
        user = kw['root'],
        password = kw['toor'],
        db = kw['db'],
        charset = kw.get('charset', 'utf8mb4'),    #utf8mb4是utf8的超集,徹底兼容utf8,支持emoji直接插入存儲
        autocommit = kw.get('autocommit', True),
        maxsize = kw.get('maxsizze', 10),
        minsize = kw.get('minsize', 1),
        loop = loop
     )

#CRUD之R
async def select(sql, args, size=None):
    log(sql, args)
    global __pool
    async with __pool.get() as conn:
        async with conn.cursor(aiomysql.DictCursor) as cur:
            await cur.execute(sql.replace('?', '%s'), args or ())   #SQL語句的佔位符是?,而MySQL的佔位符是%s,select()函數在內部自動替換。防止SQL注入攻擊:注意要始終堅持使用帶參數的SQL,而不是本身拼接SQL字符串
            if size:
                rs = await cur.fetchmany(size)  #獲取最多指定數量的記錄
            else:
                rs = await cur.fetchall()   #獲取全部記錄
        logging.info('rows returned: %s' % len(rs))
        return rs

#定義通用的execute()函數,用於執行CRUD方法
async def execute(sql, args, autocommit=True):
    log(sql)
    async with __pool.get() as conn:
        if not autocommit:
            await conn.begin()
        try:
            async with conn.cursor(aiomysql.DictCursor) as cur:
                await cur.execute(sql.replace('?', '%s'), args)
                affected = cur.rowcount
            if not autocommit:
                await conn.commit()
        except BaseException as e:
            if not autocommit:
                await conn.rollback()
            raise
        return affected

def create_args_string(num):
    L = []
    for n in range(num):
        L.append('?')
    return ','.join(L)

class Field(object):

    def __init__(self, name, column_type, primary_key, default):
        self.name = name
        self.colum_type = column_type
        self.primary_key = primary_key
        self.default = default

    def __str__(self):
        return '<%s , $s:%s>' % (self.__class__.__name__, self.colum_type, self.name)

class StringField(Field):

    def __init__(self, name=None, primary_key=False, default=None, ddl='varchar(100)'):
        super().__init__(name, ddl, primary_key, default)

class BooleanField(Field):

    def __init__(self, name=None, default=False):
        super().__init__(name, 'boolean', False, default)

class IntegerField(Field):

    def __init__(self, name=None, primary_key=False, default=0):
        super().__init__(name, 'bigint', primary_key, default)

class FloatField(Field):

    def __init__(self, name=None, primary_key=False, default=0.0):
        super().__init__(name, 'real', primary_key, default)

class TextField(Field):

    def __init__(self, name=None, default=None):
        super().__init__(name, 'text', False, default)

class ModeMetaclass(type):

    def __new__(cls, name, bases, attrs):
        if name == 'Model':
            return type.__new__(cls, name, bases, attrs)
        tableName = attrs.get('__table__', None) or name
        logging.info('found model: %s (table: %s)' % (name, tableName))
        mappings = dict()
        fields = []
        primaryKey = None
        for k, v in attrs.items():
            if isinstance(v, Field):
                logging.info(' found mapping: %s ==> %s' % (k, v))
                mappings[k] = v
                if v.primary_key:
                    # 找到主鍵
                    if primaryKey:
                        raise StandardErroe('Duplicate primary key for field: %s' % k)
                    primaryKey = k
                else:
                    fields.append(k)
        if not primaryKey:
            raise StandardError('Primary key not found.')
        for k in mappings.keys():
            attrs.pop(k)
        escaped_fields = list(map(lambda f: '`%s`' % f, fields))
        attrs['__mappings__'] = mappings    # 保存屬性和列的映射關係
        attrs['__table__'] = tableName
        attrs['__primary_key__'] = primaryKey   # 主鍵屬性名
        attrs['__fields__'] = fields    # 除主鍵外的屬性名
        attrs['__select__'] = 'select `%s`, %s from `%s`' % (primaryKey, ', '.join(escaped_fields), tableName)
        attrs['__insert__'] = 'insert into `%s` (%s, `%s`) values (%s)' % (tableName, ', '.join(escaped_fields), primaryKey, create_args_string(len(escaped_fields) + 1))
        attrs['__update__'] = 'update `%s` set %s where `%s` = ?' % (tableName, ', '.join(map(lambda  f: '`%s` = ?' % (mappings.get(f).name or f), fields)), primaryKey)
        attrs['__delete__'] = 'delete from `%s` where `%s` = ?' % (tableName, primaryKey)
        return type.__new__(cls, name, bases, attrs)

class Model(dict, metaclass=ModelMetaclass):

    def __init__(self, **kw):
        super(Model, self).__init__(**kw)

    def __getattr__(self, key):
        try:
            return self[key]
        except KeyError:
            raise AttributeError(r"'Model' object has no attribute '%s'" % key)

    def __setattr__(self, key, value):
        self[key] = value

    def getValue(self, key):
        return getattr(self, key, None)

    def getValueOrDefault(self, key):
        value = getattr(self, key, None)
        if value is None:
            field = self.__mappings__[key]
            if field.default is not None:
                value = field.default() if callable(field.default) else field.default
                logging.debug('using default value for %s : %s' % (key, str(value)))
                setattr(self, key, value)
        return value

    @classmethod
    async def findall(cls, where=None, args=None, **kw):
        ' find objects by where clause.'
        sql = [cls.__slect__]
        if where:
            sql.append('where')
            sql.append(where)
        if args is None:
            args = []
        orderBy = kw.get('orderBy', None)
        if orderBy:
            sql.append('order by')
            sql.append(orderBy)
        limit = kw.get('limit', None)
        if limit is not None:
            sql.append('limit')
            if isinstance(limit, int):
                sql.append('?')
                args.append(limit)
            elif isinstance(limit, tuple) and len(limit) == 2:
                sql.append('?, ?')
                args.extend(limit)
            else:
                raise ValueError('Invalid limit value: %s' % str(limit))
        rs = await select(' '.join(sql), args)
        return [cls(**r) for r in rs]

    @classmethod
    async def findNumber(cls, selectField, where=None, args=None):
        ' find number by select and where. '
        sql = ['select %s _num_ from `%s`' % (selectField, cls.__table__)]
        if where:
            sql.append('where')
            sql.append(where)
        rs = await select(' '.join(sql), args, 1)
        if len(rs) == 0:
            return None
        return rs[0]['_num_']

    @classmethod
    async def find(cls, pk):
        ' find object by primary key. '
        rs = await select('%s where `%s`=?' % (cls.__select__, cls.__primary_key__), [pk], 1)
        if len(rs) == 0:
            return None
        return cls(**rs[0])

    async def save(self):
        args = list(map(self.getValueOrDefault, self.__fields__))
        args.append(self.getValueOrDefault(self.__primary_key__))
        rows = await execute(self.__inset__, args)
        if rows != 1:
            logging.warning('failed to update by primary key: affected rows: %s' % rows)

    async def remove(self):
        args = [self.getValue(self.__primary_key__)]
        rows = await execute(self.__delete__, args)
        if rows != 1:
            logging.warning('failed to remove by primary key: affected rows: %s' % rows)

    有了ORM,咱們就能夠把Python3.WebAPP.Blog須要的3個表用Model表示出來:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

'''
    Models for user, blog, comment.
'''


__author__ = 'libingbin2015@aliyun.com'

import time, uuid

from orm import Model, StringField, BooleanField, FloatField, TextField

def next_id():
    return '%015%s000' % (int(time.time() * 1000), uuid.uuid4().hex)

class User(Model):
    __table__ = 'users'

    id = StringField(primary_key=True, default=next_id(), ddl='varchar(50)')
    email = StringField(ddl='varchar(50)')
    passwd = StringField(ddl='varchar(50)')
    admin = BooleanField()
    name = StringField(ddl='varchar(50)')
    image = StringField(ddl='varchar(500)')
    created_at = FloatField(default=time.time)

class Blog(Model):
    __table__ = 'blogs'

    id = StringField(primary_key=True, default=next_id, ddl='varchar(50)')
    user_id = StringField(ddl='varchar(50)')
    user_name = StringField(ddl='varchar(50)')
    user_image = StringField(ddl='varchar(500)')
    name = StringField(ddl='varchar(50)')
    summary = StringField(ddl='varchar(200)')
    content = TextField()
    created_at = FloatField(default=time.time)

class Comment(Model):
    __table__ = 'comments'

    id = StringField(primary_key=True, default=next_id, ddl='varchar(50)')
    blog_id = StringField(ddl='varchar(50)')
    user_id = StringField(ddl='varchar(50)')
    user_name = StringField(ddl='varchar(50)')
    user_image = StringField(ddl='varchar(500)')
    content = TextField()
    created_at = FloatField(default=time.time)

3、編寫Web框架

    開始Web開發前,咱們須要編寫一個Web框架。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

__author__ = 'libingbin2015@aliyun.com'

import asyncio, os, inspect, logging, functools

from urllib import parse

from aiohttp import web

from apis import APIError

def get(path):
    '''
    Define decorator @get('/path')
    '''
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kw):
            return func(*args, **kw)
        wrapper.__method__ = 'GET'
        wrapper.__route__ = path
        return wrapper
    return decorator

def post(path):
    '''
    Define decorator @post('/path')
    '''
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kw):
            return func(*args, **kw)
        wrapper.__method__ = 'POST'
        wrapper.__route__ = path
        return wrapper
    return decorator

def get_required_kw_args(fn):
    args = []
    params = inspect.signature(fn).parameters
    for name, param in params.items():
        if param.kind == inspect.Parameter.KEYWORD_ONLY and param.default == inspect.Parameter.empty:
            args.append(name)
    return tuple(args)

def get_named_kw_args(fn):
    args = []
    params = inspect.signature(fn).parameters
    for name, param in params.items():
        if param.kind == inspect.Parameter.KEYWORD_ONLY:
            args.append(name)
    return tuple(args)

def has_named_kw_args(fn):
    params = inspect.signature(fn).parameters
    for name, param in params.items():
        if param.kind == inspect.Parameter.KEYWORD_ONLY:
            return True

def has_var_kw_arg(fn):
    params = inspect.signature(fn).parameters
    for name, param in params.items():
        if param.kind == inspect.Parameter.VAR_KEYWORD:
            return True

def has_request_arg(fn):
    sig = inspect.signature(fn)
    params = sig.parameters
    found = False
    for name, param in params.items():
        if name == 'request':
            found = True
            continue
        if found and (param.kind != inspect.Parameter.VAR_POSITIONAL and param.kind != inspect.Parameter.KEYWORD_ONLY and param.kind != inspect.Parameter.VAR_KEYWORD):
            raise ValueError('request parameter must be the last named parameter in function: %s%s' % (fn.__name__, str(sig)))
    return found

class RequestHandler(object):

    def __init__(self, app, fn):
        self._app = app
        self._func = fn
        self._has_request_arg = has_request_arg(fn)
        self._has_var_kw_arg = has_var_kw_arg(fn)
        self._has_named_kw_args = has_named_kw_args(fn)
        self._named_kw_args = get_named_kw_args(fn)
        self._required_kw_args = get_required_kw_args(fn)

    async def __call__(self, request):
        kw = None
        if self._has_var_kw_arg or self._has_named_kw_args or self._required_kw_args:
            if request.method == 'POST':
                if not request.content_type:
                    return web.HTTPBadRequest('Missing Content-Type.')
                ct = request.content_type.lower()
                if ct.startswith('application/json'):
                    params = await request.json()
                    if not isinstance(params, dict):
                        return web.HTTPBadRequest('JSON body must be object.')
                    kw = params
                elif ct.startswith('application/x-www-form-urlencoded') or ct.startswith('multipart/form-data'):
                    params = await request.post()
                    kw = dict(**params)
                else:
                    return web.HTTPBadRequest('Unsupported Content-Type: %s' % request.content_type)
            if request.method == 'GET':
                qs = request.query_string
                if qs:
                    kw = dict()
                    for k, v in parse.parse_qs(qs, True).items():
                        kw[k] = v[0]
        if kw is None:
            kw = dict(**request.match_info)
        else:
            if not self._has_var_kw_arg and self._named_kw_args:
                # remove all unamed kw:
                copy = dict()
                for name in self._named_kw_args:
                    if name in kw:
                        copy[name] = kw[name]
                kw = copy
            # check named arg:
            for k, v in request.match_info.items():
                if k in kw:
                    logging.warning('Duplicate arg name in named arg and kw args: %s' % k)
                kw[k] = v
        if self._has_request_arg:
            kw['request'] = request
        # check required kw:
        if self._required_kw_args:
            for name in self._required_kw_args:
                if not name in kw:
                    return web.HTTPBadRequest('Missing argument: %s' % name)
        logging.info('call with args: %s' % str(kw))
        try:
            r = await self._func(**kw)
            return r
        except APIError as e:
            return dict(error=e.error, data=e.data, message=e.message)

def add_static(app):
    path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'static')
    app.router.add_static('/static/', path)
    logging.info('add static %s => %s' % ('/static/', path))

def add_route(app, fn):
    method = getattr(fn, '__method__', None)
    path = getattr(fn, '__route__', None)
    if path is None or method is None:
        raise ValueError('@get or @post not defined in %s.' % str(fn))
    if not asyncio.iscoroutinefunction(fn) and not inspect.isgeneratorfunction(fn):
        fn = asyncio.coroutine(fn)
    logging.info('add route %s %s => %s(%s)' % (method, path, fn.__name__, ', '.join(inspect.signature(fn).parameters.keys())))
    app.router.add_route(method, path, RequestHandler(app, fn))

def add_routes(app, module_name):
    n = module_name.rfind('.')
    if n == (-1):
        mod = __import__(module_name, globals(), locals())
    else:
        name = module_name[n+1:]
        mod = getattr(__import__(module_name[:n], globals(), locals(), [name]), name)
    for attr in dir(mod):
        if attr.startswith('_'):
            continue
        fn = getattr(mod, attr)
        if callable(fn):
            method = getattr(fn, '__method__', None)
            path = getattr(fn, '__route__', None)
            if method and path:
                add_route(app, fn)

    編寫配置文件config_default.py :

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

__author__ = 'libingbin2015@aliyun.com'

'''
    Default configurations.
'''

# 'secret': secret 是必需的選項,這是用於簽名會話ID cookie的密鑰。這能夠是單個密鑰的字符串或多個祕密的數組(只有第一個元素將用於簽名會話ID cookie)而在驗證請求中的簽名時,將考慮全部元素
# 另外, 考慮到安全性, 這個密鑰是不建議存儲在的程序中的. 最好的方法是存儲在你的系統環境變量中, 經過 os.getenv(key, default=None) 得到

configs = {
    'debug': True,
    'db': {
        'host': '127.0.0.1',
        'port': 3306,
        'user': 'root',
        'password': 'toor',
        'db': 'db'
    },
    'session': {
        'secret': 'libingbin2015@aliyun.com'
    }
}

    編寫配置文件config_override.py :

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

__author__ = 'libingbin2015@aliyun.com'

'''
    Override configurations.
'''



configs = {
    'db': {
        'host': '127.0.0.1'
    }
}

    把 config_default.py 做爲開發環境的標準配置,把 config_override.py 做爲生產環境的標準配置,咱們就能夠既方便地在本地開發,又能夠隨時把應用部署到服務器上。應用程序讀取配置文件須要優先從config_override.py 讀取。爲了簡化讀取配置文件,能夠把全部配置讀取到統一的 config.py 中:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

__author__ = 'libingbin2015@aliyun.com'

'''
    Configuration
'''


import config_default

class Dict(dict):
    '''
        Simple dict but support access as x.y style.
    '''
    def __init__(self, names=(), values=(), **kw):
        super(Dict, self).__init__(**kw)
        for k, v in zip(names, values):
            self[k] = v

    def __getattr__(self, key):
        try:
            return self[key]
        except KeyError:
            raise AttributeError(r"'Dict' object has no attribute '%s'" % key)

    def __setattr__(self, key, value):
        self[key] = value

def merge(defaults, override):
    r = {}
    for k, v in defaults.items():
        if k in override:
            if isinstance(v, dict):
                r[k] = merge(v, override[k])
            else:
                r[k] = override[k]
        else:
            r[k] = v
    return r

def toDict(d):
    D = Dict()
    for k, v in d.items():
        D[k] = toDict(v) if isinstance(v, dict) else v
    return D

configs = config_default.configs

try:
    import config_override
    configs = merge(configs, config_override.configs)
except ImportError:
    pass

configs = toDict(configs)

本文爲博主原創文章,轉載請註明出處!

https://my.oschina.net/u/3375733/blog/

相關文章
相關標籤/搜索