npm install -g @tarojs/cli
css
taro init taro-login
html
cd taro-login
前端
npm run dev:weapp
python
... class App extends Component { config = { pages: [ 'pages/user/user', // new 'pages/index/index', ],
若是咱們須要用戶一進入就取得用戶的受權,以便於進行某些記錄用戶信息的操做,而微信又要求用戶去點頁面上的某個按鈕才能獲取信息,那怎麼辦呢?只能把一個按鈕放在用戶不能不點的地方,那就只有彈窗了。微信wx.showModal
不能知足咱們的需求,只能本身造一個,在用戶第一次進來的時候彈窗,再次進來的時候則不顯示。爲了讓這個組件具備拓展性,咱們根據傳入的值來修改確認
位置按鈕的屬性,若是是受權的彈窗就改按鈕屬性爲openType='getUserInfo'
。(摘自 Taro 多端開發實現原理與項目實戰)
import Taro, { Component } from '@tarojs/taro' import { View, Button } from '@tarojs/components' import './modal.scss' class Modal extends Component { constructor() { super(...arguments) this.state = {} } onConfirmClick = () => { this.props.onConfirmCallback() } onCancelClick = () => { this.props.onCancelCallback() } onAuthConfirmClick = (e) => { this.props.onConfirmCallback(e.detail) } preventTouchMove = (e) => { e.stopPropagation() } render() { const { title, contentText, cancelText, confirmText, isAuth } = this.props return ( <View className='toplife_modal' onTouchMove={this.preventTouchMove}> <View className='toplife_modal_content'> <View className='toplife_modal_title'>{title}</View> <View className='toplife_modal_text'>{contentText}</View> <View className='toplife_modal_btn'> <Button className='toplife_modal_btn_cancel' onClick={this.onCancelClick}>{cancelText}</Button> {!isAuth ? <Button className='toplife_modal_btn_confirm' onClick={this.onConfirmClick}>{confirmText}</Button> : <Button className='toplife_modal_btn_confirm' openType='getUserInfo' onGetUserInfo={this.onAuthConfirmClick}>受權</Button>} </View> </View> </View> ) } } Modal.defaultProps = { title: '', contentText: '', cancelText: '取消', confirmText: '肯定', isAuth: false, onCancelCallback: () => { }, onConfirmCallback: () => { } } export default Modal
Modal
組件還算比較簡單,組件的屬性:mysql
字段 | 說明 |
---|---|
title | 提示的標題 |
contentText | 提示的描述 |
cancelText | 取消按鈕的文案 |
cancelCallback | 取消回調的函數 |
confirmText | 確認按鈕的文案 |
confirmCallback | 確認回調函數 |
isAuth | 標記是否爲受權按鈕 |
在內部設置了一個函數preventTouchMove
,其做用是彈窗出現蒙層的時候,阻止在蒙版上的滑動手勢onTouchMove
。另一個函數authConfirmClick
, 當isAuth
爲真時,確認按鈕爲取得我的信息的受權按鈕,此時把我的信息當值傳遞給調用的函數。(摘自 Taro 多端開發實現原理與項目實戰)
/*postcss-pxtransform rn eject enable*/ .toplife_modal { position: fixed; width: 100%; height: 100%; left: 0; top: 0; background-color: rgba(0, 0, 0, .8); z-index: 100; &_content { position: absolute; left: 50%; top: 50%; width: 600px; height: 320px; transform: translate(-50%, -50%); background-color: #fff; color: #232321; text-align: center; border-radius: 30px; } &_title { margin-top: 40px; font-size: 32px; } &_text { margin-top: 40px; font-size: 24px; } &_btn { position: absolute; bottom: 0; left: 0; width: 100%; height: 88px; border-top: 2px solid #eee; &_cancel { color: #8c8c8c; border-radius: 0; border: 0; border-right: 2px solid #eee; border-bottom-left-radius: 30px; } &_confirm { color: #666; border-radius: 0; border: 0; border-bottom-right-radius: 30px; } button { display: block; float: left; width: 50%; height: 88px; text-align: center; line-height: 88px; font-size: 32px; box-sizing: border-box; background-color: #fff; &::after { border: 0; } } } }
user.js
中引用該Modal
組件import Taro, { Component } from '@tarojs/taro'; import { View, Image, Text } from '@tarojs/components'; import classnames from 'classnames' import Modal from '../../components/modal/modal'; import { setGlobalData } from '../../utils/globalData'; import { getUserInfo, getIsAuth } from '../../utils/getUser'; class Info extends Component { config = { navigationBarTitleText: 'TARO商城', enablePullDownRefresh: true, backgroundTextStyle: 'dark', disableScroll: true } constructor() { super(...arguments) this.state = { animationClass: '', showAuthModal: false, shouldIndexHidden: false, } this.env = process.env.TARO_ENV } hideAuthModal() { this.setState({ showAuthModal: false }) Taro.setStorage({ key: 'isHomeLongHideAuthModal', data: true }) } onProcessAuthResult = (userData) => { Taro.setStorage({ key: 'isHomeLongHideAuthModal', data: true }) if (userData.userInfo) { setGlobalData('userData', userData) } this.setState({ showAuthModal: false }) getIsAuth() } async onPullDownRefresh() { if (this.state.shouldIndexHidden) { Taro.stopPullDownRefresh() // 中止下拉刷新 } else { await this.props.onFetchIndexList() Taro.stopPullDownRefresh() // 中止下拉刷新 } } componentDidMount() { if (this.env === 'weapp') { // 用類名來控制動畫 setTimeout(async () => { const userData = await getUserInfo(); Taro.getStorage({ key: 'isHomeLongHideAuthModal', success: (res) => { const isHomeLongHideAuthModal = res.data; let showAuthModal if (!userData && !this.state.showAuthModal && !isHomeLongHideAuthModal) { showAuthModal = true } else { showAuthModal = false } this.setState({ animationClass: 'animation', showAuthModal }) }, fail: () => { let showAuthModal if (!userData && !this.state.showAuthModal) { showAuthModal = true } else { showAuthModal = false } this.setState({ animationClass: 'animation', showAuthModal }) } }) }, 1000) getIsAuth() } else if (this.env === 'h5' || this.env === 'rn') { console.log('h5登陸') } } render() { const { animationClass, shouldIndexHidden, showAuthModal } = this.state const { loginname, avatar_url } = this.props; const indexClassNames = classnames('container', 'index', animationClass, { hidden: shouldIndexHidden }) return ( <View className={indexClassNames}> <View className='login-head'> <Image className='login-head-back' src={require('../../assets/img/loginBack.jpg')} /> <Image className='login-head-head' src={avatar_url ? avatar_url : require('../../assets/img/head.png')} /> {loginname ? <Text classnames='login-head-name'>{loginname}</Text> : null} </View> {showAuthModal && <Modal title='受權提示' contentText='誠邀您完成受權,尊享暢遊體驗' onCancelCallback={this.hideAuthModal.bind(this)} onConfirmCallback={this.onProcessAuthResult.bind(this)} isAuth />} </View> ) } } export default Info
咱們是如何保證這個應用只有一次受權彈窗呢? 關鍵代碼是Taro.setStorageSync('isHomeLongHideAuthModal', true)
,若是彈出了一次,就在本地存一個標記已經彈過受權框,下一次彈窗以前能夠根據此判斷。至此咱們完成了受權處理,但若是能夠的話仍是要優雅一些,在須要的時候才徵求用戶受權,保證用戶體驗。(摘自Taro 多端開發實現原理與項目實戰)web
/src/utils/globalData.jssql
const globalData = {} export function setGlobalData(key, val) { globalData[key] = val } export function getGlobalData(key) { return globalData[key] }
/src/utils/request.jsshell
import Taro from '@tarojs/taro'; import '@tarojs/async-await'; export function getJSON(url, data) { Taro.showLoading(); return Taro.request({ url: url, data: data, method: 'GET' }).then(result => { Taro.hideLoading(); return result; }) } export function postJSON(url, data) { Taro.showLoading() return Taro.request({ header: { 'content-type': 'application/json' }, url: url, data: data, method: 'POST' }).then(result => { Taro.hideLoading(); return result; }); }
/src/constants/api數據庫
const rootPath = 'http://127.0.0.1:5000/v1'; const apiObject = { registerclient: rootPath + '/client/register', //註冊用戶 getusertoken: rootPath + '/token', // 登陸成功以後獲取用戶token checkusertoken: rootPath + '/token/secret', //驗證用戶token getuserinfo: rootPath + '/user', //獲取用戶信息 } export default apiObject;
/src/utils/getUser.jsnpm
import Taro from '@tarojs/taro' import { getGlobalData } from './globalData' import api from '../constants/api'; import { postJSON } from '../utils/request'; async function getUserInfo() { const userData = getGlobalData('userData') if (userData) { return userData } try { const _userData = await Taro.getUserInfo() return _userData } catch (err) { console.log(err) console.log('微信登陸或用戶接口故障') return null } } async function getIsAuth() { const loginRes = await Taro.login() let { userInfo } = await getUserInfo() let isAuth = false if (userInfo) { // 使用微信註冊新用戶 let result = await postJSON(api.registerclient, { "avatar": userInfo.avatarUrl, "sex": userInfo.gender, "nickname": userInfo.nickName, "account": loginRes.code, "type": 200 }); if (result.data.error_code == 0) { // 登陸用戶,獲取token,緩存到前端 const tokenRes = await Taro.login() let auth_token = await postJSON(api.getusertoken, { "account": tokenRes.code, "type": 200 }) if (auth_token.statusCode == 201) { Taro.setStorage({ key: 'token', data: auth_token.data.token })// 設置到緩存 Taro.showToast({ title: '受權成功' }) userInfo.isAuth = true isAuth = true } } else { Taro.showToast({ title: '受權失敗,請稍後再試', icon: 'none' }) } } else { userInfo = { isAuth: false } } console.log('isAuth: ', isAuth) return isAuth } export { getUserInfo, getIsAuth }
├── app │ ├── __init__.py │ ├── api │ │ ├── __init__.py │ │ └── v1 │ │ ├── __init__.py │ │ ├── client.py │ │ ├── token.py │ │ └── user.py │ ├── apps.py │ ├── config │ │ ├── secure.py │ │ └── settings.py │ ├── libs │ │ ├── enums.py │ │ ├── error.py │ │ ├── error_code.py │ │ ├── format_time.py │ │ ├── get_openid.py │ │ ├── redprint.py │ │ ├── scope.py │ │ └── token_auth.py │ ├── models │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-36.pyc │ │ │ ├── base.cpython-36.pyc │ │ │ └── user.cpython-36.pyc │ │ ├── base.py │ │ └── user.py │ └── validators │ ├── __init__.py │ ├── base.py │ └── forms.py ├── manage.py └── requirements.txt
requirements.txt
Flask Flask-SQLAlchemy psycopg2-binary cymysql Flask-Testing coverage flake8 flask-debugtoolbar flask-cors flask-migrate flask-bcrypt pyjwt gunicorn requests flask-httpauth flask-wtf
app
目錄和__init__.py
文件和apps.py
# File: /app/apps.py # -*- coding: utf-8 -*- from flask import Flask as _Flask from flask.json import JSONEncoder as _JSONEncoder from app.libs.error_code import ServerError from datetime import date class JSONEncoder(_JSONEncoder): def default(self, o): if hasattr(o, 'keys') and hasattr(o, '__getitem__'): return dict(o) if isinstance(o, date): return o.strftime('%Y-%m-%d') raise ServerError() class Flask(_Flask): json_encoder = JSONEncoder
# File: /app/__init__.py # -*- coding: utf-8 -*- from .apps import Flask from flask_debugtoolbar import DebugToolbarExtension from flask_cors import CORS from flask_migrate import Migrate from flask_bcrypt import Bcrypt from app.models.base import db # instantiate 實例化 toolbar = DebugToolbarExtension() migrate = Migrate(db=db) bcrypt = Bcrypt() def create_app(): # instantiate the app app = Flask(__name__) # enable CORS CORS(app) # set config app.config.from_object('app.config.settings') app.config.from_object('app.config.secure') # set up extensions toolbar.init_app(app) migrate.init_app(app, db) bcrypt.init_app(app) # register blueprints register_blueprints(app) register_plugin(app) # shell context for flask cli @app.shell_context_processor def ctx(): return {'app': app, 'db': db} return app def register_blueprints(app): from app.api.v1 import create_blueprint_v1 app.register_blueprint(create_blueprint_v1(), url_prefix='/v1') def register_plugin(app): db.init_app(app) with app.app_context(): db.create_all()
/app/config/目錄,在這個目錄下新建兩個文件settings.py和secure.py
# File: /app/config/settings.py # -*- coding: utf-8 -*- # TOKEN_EXPIRATION = 30 * 24 * 3600 DEBUG = 'true' TOKEN_EXPIRATION_DAYS = 30 TOKEN_EXPIRATION_SECONDS = 0 # encryption的複雜程度,默認值爲12 BCRYPT_LOG_ROUNDS = 4
# File: /app/config/secure.py # -*- coding: utf-8 -*- SQLALCHEMY_DATABASE_URI = \ 'mysql+cymysql://root:root1234@localhost/flask-rest' SECRET_KEY = '***' SQLALCHEMY_TRACK_MODIFICATIONS = True MINA_APP = { 'AppID': '***', 'AppSecret': '***' }
manage.py
#File: /manage.py # -*- coding: utf-8 -*- from werkzeug.exceptions import HTTPException from app import create_app from app.libs.error import APIException from app.libs.error_code import ServerError app = create_app() @app.errorhandler(Exception) def framework_error(e): """全局攔截異常""" if isinstance(e, APIException): return e if isinstance(e, HTTPException): code = e.code msg = e.description error_code = 1007 return APIException(msg, code, error_code) else: if app.config['DEBUG']: return ServerError() else: raise e if __name__ == '__main__': app.run()
新建文件夾 /app/libs/
#File: /app/libs/error.py # -*- coding: utf-8 -*- """ 自定義錯誤文件 """ from flask import request, json from werkzeug.exceptions import HTTPException class APIException(HTTPException): """自定義api請求錯誤,返回的json格式""" code = 500 msg = '抱歉,後臺發生了錯誤 (* ̄︶ ̄)!' error_code = 999 def __init__(self, msg=None, code=None, error_code=None, headers=None): if code: self.code = code if error_code: self.error_code = error_code if msg: self.msg = msg super(APIException, self).__init__(msg, None) def get_body(self, environ=None): body = dict( msg=self.msg, error_code=self.error_code, request=request.method + ' ' + self.get_url_no_param() ) text = json.dumps(body) return text def get_headers(self, environ=None): return [('Content-Type', 'application/json')] @staticmethod def get_url_no_param(): full_path = str(request.full_path) main_path = full_path.split('?') return main_path[0]
#File: /app/libs/error_code.py # -*- coding: utf-8 -*- from werkzeug.exceptions import HTTPException from app.libs.error import APIException class Success(APIException): code = 201 msg = 'success' error_code = 0 class DeleteSuccess(Success): code = 202 error_code = 1 class ServerError(APIException): code = 500 msg = '抱歉,後臺發生了錯誤 (* ̄︶ ̄)!' error_code = 999 class ClientTypeError(APIException): code = 400 msg = '未檢測到客戶端類型' error_code = 1006 class ParameterException(APIException): code = 400 msg = '無效參數' error_code = 1000 class NotFound(APIException): code = 404 msg = '沒有找到對應的資源 O__O...' error_code = 1001 class AuthFailed(APIException): code = 401 error_code = 1005 msg = '認證失敗' class Forbidden(APIException): code = 403 error_code = 1004 msg = '禁止訪問,不在對應權限內' class SingleLogin(APIException): code = 400 error_code = 2002 msg = '請從新登陸' class DuplicateAct(APIException): code = 400 error_code = 2001 msg = '請勿重複操做'
#File: /app/libs/redprint.py # -*- coding: utf-8 -*- class Redprint: def __init__(self, name): self.name = name self.mound = [] def route(self, rule, **options): def decorator(f): self.mound.append((f, rule, options)) return f return decorator def register(self, bp, url_prefix=None): if url_prefix is None: url_prefix = '/' + self.name for f, rule, options in self.mound: endpoint = self.name + '+' + \ options.pop("endpoint", f.__name__) bp.add_url_rule(url_prefix + rule, endpoint, f, **options)
#File: /app/api/v1/__init__.py # -*- coding: utf-8 -*- from flask import Blueprint from app.api.v1 import user, client, token def create_blueprint_v1(): bp_v1 = Blueprint('v1', __name__) user.api.register(bp_v1) client.api.register(bp_v1) token.api.register(bp_v1) return bp_v1
client.py
#File: /app/api/v1/client.py # -*- coding: utf-8 -*- from app.libs.error_code import Success, ParameterException from app.libs.redprint import Redprint from app.models.user import User from app.validators.forms import ClientForm, UserEmailForm, MinaForm from app.libs.enums import ClientTypeEnum from app.libs.get_openid import get_openid api = Redprint('client') @api.route('/register', methods=['POST']) def create_client(): form = ClientForm().validate_for_api() promise = { ClientTypeEnum.USER_EMAIL: __register_user_by_email, ClientTypeEnum.USER_MINA: __register_user_by_mina, } promise[form.type.data]() return Success() def __register_user_by_email(): form = UserEmailForm().validate_for_api() User.register_by_email(form.nickname.data, form.account.data, form.secret.data) def __register_user_by_mina(): form = MinaForm().validate_for_api() account = get_openid(form.account.data) if account is None: raise ParameterException else: User.register_by_mina(form.nickname.data, account, form.sex.data, form.avatar.data)
token.py
#File: /app/api/v1/token.py # -*- coding: utf-8 -*- import jwt import datetime from flask import current_app, jsonify from app.libs.enums import ClientTypeEnum from app.libs.error_code import AuthFailed from app.libs.redprint import Redprint from app.models.user import User from app.validators.forms import ClientForm, TokenForm from app.libs.format_time import get_format_timestamp api = Redprint('token') @api.route('', methods=['POST']) def get_token(): """登陸功能,認證成功返回token""" form = ClientForm().validate_for_api() promise = { ClientTypeEnum.USER_EMAIL: User.verify, ClientTypeEnum.USER_MINA: User.mina_login, } identity = promise[ClientTypeEnum(form.type.data)]( form.account.data, form.secret.data ) # Token token = generate_auth_token(identity['uid'], form.type.data, identity['login_time'], identity['scope']) t = {'token': token.decode('ascii')} return jsonify(t), 201 @api.route('/secret', methods=['POST']) def get_token_info(): """獲取令牌信息""" form = TokenForm().validate_for_api() auth_token = form.token.data try: data = jwt.decode(auth_token, current_app.config['SECRET_KEY']) except jwt.ExpiredSignatureError: raise AuthFailed(msg='token is expired', error_code=1003) except jwt.InvalidTokenError: raise AuthFailed(msg='token is invalid', error_code=1002) r = { 'scope': data['scope'], 'create_at': get_format_timestamp(data['iat']), 'expire_in': get_format_timestamp(data['exp']), 'uid': data['uid'], 'login_time': get_format_timestamp(data['login_time']) } return jsonify(r) def generate_auth_token(uid, ac_type, login_time, scope=None): """生成令牌""" try: payload = { 'exp': datetime.datetime.utcnow() + datetime.timedelta( days=current_app.config['TOKEN_EXPIRATION_DAYS'], seconds=current_app.config['TOKEN_EXPIRATION_SECONDS'], ), 'iat': datetime.datetime.utcnow(), 'uid': uid, 'type': ac_type.value, 'login_time': login_time, 'scope': scope, } return jwt.encode( payload, current_app.config['SECRET_KEY'], algorithm='HS256' ) except Exception as e: return e
user.py
#File: /app/api/v1/user.py # -*- coding: utf-8 -*- from flask import jsonify, g from app.libs.error_code import DeleteSuccess from app.libs.redprint import Redprint from app.libs.token_auth import auth from app.models.base import db from app.models.user import User api = Redprint('user') @api.route('/<int:uid>', methods=['GET']) @auth.login_required def super_get_user(uid): user = User.query.filter_by(id=uid).first_or_404() return jsonify(user) @api.route('', methods=['GET']) @auth.login_required def get_user(): uid = g.user.uid user = User.query.filter_by(id=uid).first_or_404() return jsonify(user) @api.route('/<int:uid>', methods=['DELETE']) def super_delete_user(uid): with db.auto_commit(): user = User.query.filter_by(id=uid).first_or_404() user.delete() return DeleteSuccess() @api.route('', methods=['DELETE']) @auth.login_required def delete_user(): uid = g.user.uid with db.auto_commit(): user = User.query.filter_by(id=uid).first_or_404() user.delete() return DeleteSuccess() @api.route('', methods=['PUT']) def update_user(): return 'update'
models
#File: /app/models/base.py # -*- coding: utf-8 -*- from datetime import datetime from flask_sqlalchemy import SQLAlchemy as _SQLAlchemy, BaseQuery from sqlalchemy import inspect, Column, Integer, SmallInteger, orm from contextlib import contextmanager from app.libs.error_code import NotFound class SQLAlchemy(_SQLAlchemy): @contextmanager def auto_commit(self): try: yield self.session.commit() except Exception as e: db.session.rollback() raise e class Query(BaseQuery): def filter_by(self, **kwargs): if 'status' not in kwargs.keys(): kwargs['status'] = 1 return super(Query, self).filter_by(**kwargs) def get_or_404(self, ident): rv = self.get(ident) if not rv: raise NotFound() return rv def first_or_404(self): rv = self.first() if not rv: raise NotFound() return rv db = SQLAlchemy(query_class=Query) class Base(db.Model): __abstract__ = True create_time = Column(Integer) status = Column(SmallInteger, default=1) def __init__(self): self.create_time = int(datetime.now().timestamp()) def __getitem__(self, item): return getattr(self, item) @property def create_datetime(self): if self.create_time: return datetime.fromtimestamp(self.create_time) else: return None def set_attrs(self, attrs_dict): for key, value in attrs_dict.items(): if hasattr(self, key) and key != 'id': setattr(self, key, value) def delete(self): """刪除用戶,註銷用戶""" self.status = 0 def active(self): """激活用戶""" self.status = 1 def update(self): """更新數據庫的表內容""" try: db.session.commit() except Exception as e: db.session.rollback() return str(e) def keys(self): return self.fields def hide(self, *keys): for key in keys: self.fields.remove(key) return self def append(self, *keys): for key in keys: self.fields.append(key) return self class MixinJSONSerializer: @orm.reconstructor def init_on_load(self): self._fields = [] # self._include = [] self._exclude = [] self._set_fields() self.__prune_fields() def _set_fields(self): pass def __prune_fields(self): columns = inspect(self.__class__).columns if not self._fields: all_columns = set(columns.keys()) self._fields = list(all_columns - set(self._exclude)) def hide(self, *args): for key in args: self._fields.remove(key) return self def keys(self): return self._fields def __getitem__(self, key): return getattr(self, key)
#File: /app/models/user.py # -*- coding: utf-8 -*- from datetime import datetime from flask import current_app from sqlalchemy import Column, Integer, String, SmallInteger from app import bcrypt from app.libs.error_code import AuthFailed from app.models.base import Base, db from app.libs.format_time import get_current_timestamp from app.libs.get_openid import get_openid from app.libs.error_code import ParameterException class User(Base): id = Column(Integer, primary_key=True) nickname = Column(String(24), unique=True) email = Column(String(24), unique=True) mobile = Column(String(11), unique=True) sex = Column(Integer, default=0) # 1男2女 avatar = Column(String(200)) # 頭像 register_ip = Column(String(100)) # 註冊ip auth = Column(SmallInteger, default=1) # 權限 openid = Column(String(80), unique=True) _password = Column('password', String(100)) login_time = Column(Integer, default=int(datetime.now().timestamp())) @property def login_datetime(self): if self.login_time: return datetime.fromtimestamp(self.login_time) else: return None def keys(self): return ['id', 'nickname', 'email', 'auth'] @property def password(self): return self._password @password.setter def password(self, raw): self._password = bcrypt.generate_password_hash( raw, current_app.config['BCRYPT_LOG_ROUNDS']).decode('utf-8') @staticmethod def register_by_email(nickname, account, secret): """經過郵箱註冊""" with db.auto_commit(): user = User() user.nickname = nickname user.email = account user.password = secret db.session.add(user) @staticmethod def verify(email, password): """經過郵箱登陸""" user = User.query.filter_by(email=email).first_or_404() if not user.check_password(password): raise AuthFailed() scope = 'AdminScope' if user.auth == 2 else 'UserScope' login_time = get_current_timestamp() user.login_time = login_time User.update(User) return {'uid': user.id, 'scope': scope, 'login_time': login_time} def check_password(self, raw): if not self._password: return False return bcrypt.check_password_hash(self._password, raw) @staticmethod def register_by_mina(nickname, account, sex, avatar): """經過小程序註冊""" with db.auto_commit(): user = User() user.nickname = nickname user.openid = account user.sex = sex user.avatar = avatar db.session.add(user) @staticmethod def mina_login(account, secret): """經過小程序登陸""" openid = get_openid(account) # 經過code來來獲取openid if openid is None: raise ParameterException user = User.query.filter_by(openid=openid).first_or_404() scope = 'AdminScope' if user.auth == 2 else 'UserScope' login_time = get_current_timestamp() user.login_time = login_time User.update(User) return {'uid': user.id, 'scope': scope, 'login_time': login_time}
# File: /app/libs/enums.py # -*- coding: utf-8 -*- from enum import Enum class ClientTypeEnum(Enum): USER_EMAIL = 100 USER_MOBILE = 101 # 微信小程序 USER_MINA = 200 # 微信公衆號 USER_WX = 201
#File: /app/libs/format_time.py # -*- coding: utf-8 -*- import datetime def get_current_date(): """獲取當前時間""" return datetime.datetime.now() def get_current_timestamp(): """獲取當前時間的時間戳""" return int(datetime.datetime.now().timestamp()) def get_format_date(date=None, format_time="%Y-%m-%d %H:%M:%S"): """獲取格式化時間""" if date is None: date = datetime.datetime.now() return date.strftime(format_time) def get_format_timestamp(date=None, format_time="%Y-%m-%d %H:%M:%S"): """格式化時間戳""" if date is None: date = datetime.datetime.now() return datetime.datetime.fromtimestamp(date).strftime(format_time)
#File: /app/libs/get_openid.py # -*- coding: utf-8 -*- import requests import json from flask import current_app def get_openid(code): api = 'https://api.weixin.qq.com/sns/jscode2session' params = 'appid={0}&secret={1}&js_code={2}&grant_type=authorization_code' \ .format(current_app.config['MINA_APP']['AppID'], current_app.config['MINA_APP']['AppSecret'], code) url = api + '?' + params response = requests.get(url=url) res = json.loads(response.text) openid = None if 'openid' in res: openid = res['openid'] return openid
scope.py
權限管理函數#File: /app/libs/scope.py # -*- coding: utf-8 -*- class Scope: allow_api = [] allow_module = [] forbidden = [] def __add__(self, other): """重載加號運算符""" self.allow_api = self.allow_api + other.allow_api self.allow_api = list(set(self.allow_api)) self.allow_module = self.allow_module + other.allow_module self.allow_module = list(set(self.allow_module)) self.forbidden = self.forbidden + other.forbidden self.forbidden = list(set(self.forbidden)) return self class AdminScope(Scope): allow_module = ['v1.user'] def __init__(self): pass class UserScope(Scope): forbidden = ['v1.user+super_get_user', 'v1.user+super_delete_user'] def __init__(self): self + AdminScope() def is_in_scope(scope, endpoint): # 把類名的字符串實例化 scope = globals()[scope]() splits = endpoint.split('+') red_name = splits[0] if endpoint in scope.forbidden: return False if endpoint in scope.allow_api: return True if red_name in scope.allow_module: return True else: return False
#File: /app/libs/token_auth.py # -*- coding: utf-8 -*- import jwt from collections import namedtuple from flask import current_app, g, request from flask_httpauth import HTTPBasicAuth from app.models.user import User as _User from app.libs.scope import is_in_scope from app.libs.error_code import AuthFailed, Forbidden, SingleLogin auth = HTTPBasicAuth() User = namedtuple('User', ['uid', 'ac_type', 'scope', 'login_time']) @auth.verify_password def verify_password(token, password): user_info = verify_auth_token(token) if not user_info: return False else: g.user = user_info return True def verify_auth_token(token): try: data = jwt.decode(token, current_app.config['SECRET_KEY']) except jwt.ExpiredSignatureError: raise AuthFailed(msg='token is expired', error_code=1003) except jwt.InvalidTokenError: raise AuthFailed(msg='token is invalid', error_code=1002) uid = data['uid'] ac_type = data['type'] scope = data['scope'] login_time = data['login_time'] user = _User.query.filter_by(id=uid).first_or_404() if login_time != user.login_time: raise SingleLogin() # request 視圖函數 allow = is_in_scope(scope, request.endpoint) if not allow: raise Forbidden() return User(uid, ac_type, scope, login_time)
#File: /app/validators/base.py # -*- coding: utf-8 -*- from flask import request from wtforms import Form from app.libs.error_code import ParameterException class BaseForm(Form): def __init__(self): data = request.get_json(silent=True) args = request.args.to_dict() super(BaseForm, self).__init__(data=data, **args) def validate_for_api(self): valid = super(BaseForm, self).validate() if not valid: raise ParameterException(msg=self.errors) return self
#File: /app/validators/forms.py # -*- coding: utf-8 -*- from wtforms import StringField, IntegerField from wtforms.validators import DataRequired, length, Email, Regexp from wtforms import ValidationError from app.libs.enums import ClientTypeEnum from app.models.user import User from app.validators.base import BaseForm as Form class ClientForm(Form): account = StringField(validators=[ DataRequired(message='不容許爲空'), length(min=5, max=32)]) secret = StringField() type = IntegerField(validators=[DataRequired()]) def validate_type(self, value): try: client = ClientTypeEnum(value.data) except ValueError as e: raise e self.type.data = client class UserEmailForm(ClientForm): account = StringField(validators=[Email(message='invalidate email')]) secret = StringField(validators=[ DataRequired(), # password can only include letters , numbers and "_" Regexp(r'^[A-Za-z0-9_*&$#@]{6,22}$') ]) nickname = StringField(validators=[DataRequired(), length(min=2, max=22)]) def validate_account(self, value): if User.query.filter_by(email=value.data).first(): raise ValidationError() class TokenForm(Form): token = StringField(validators=[DataRequired()]) class MinaForm(Form): account = StringField(validators=[ DataRequired(message='不容許爲空'), length(min=10, max=80)]) nickname = StringField(validators=[DataRequired()]) sex = IntegerField(validators=[DataRequired()]) avatar = StringField(validators=[DataRequired()]) type = IntegerField(validators=[DataRequired()]) def validate_type(self, value): try: client = ClientTypeEnum(value.data) except ValueError as e: raise e self.type.data = client
error_code.md
error_code | msg |
---|---|
0 | 建立成功 |
1 | 刪除成功 |
999 | 未知錯誤 - 後臺發生了錯誤 |
1000 | 無效參數 |
1001 | 沒有找到對應的資源 |
1002 | token is invalid |
1003 | token is expired |
1004 | 禁止訪問,不在對應權限內 |
1005 | 認證失敗 |
1006 | 未檢測到客戶端類型 |
2001 | 請勿重複操做 |
2002 | 請從新登陸 |
Flask + PyJWT 實現基於Json Web Token的用戶認證受權
endpoint | HTTP Method | Authenticated? | Result | json Body |
---|---|---|---|---|
/v1/client/register | POST | NO | 註冊用戶 | {"account":"666@qq.com","secret":"123456","type":100,"nickname":"666"} |
/v1/token | POST | NO | 獲取token | {"account":"666@qq.com","secret":"123456","type":100,"nickname":"666"} |
/v1/user | GET | NO | 用戶詳情 | 空 |
/v1/user/2 | GET | YES | 管理員獲取用戶詳情 | 空 |
/v1/token/secret | POST | NO | token詳情 | {"token":"*"} |
cd users
flask db migrate
flask db upgrade
學習資料: