用 Flask 來寫個輕博客 (25) — 使用 Flask-Principal 實現角色權限功能

目錄

前文列表

用 Flask 來寫個輕博客 (1) — 建立項目
用 Flask 來寫個輕博客 (2) — Hello World!
用 Flask 來寫個輕博客 (3) — (M)VC_鏈接 MySQL 和 SQLAlchemy
用 Flask 來寫個輕博客 (4) — (M)VC_建立數據模型和表
用 Flask 來寫個輕博客 (5) — (M)VC_SQLAlchemy 的 CRUD 詳解
用 Flask 來寫個輕博客 (6) — (M)VC_models 的關係(one to many)
用 Flask 來寫個輕博客 (7) — (M)VC_models 的關係(many to many)
用 Flask 來寫個輕博客 (8) — (M)VC_Alembic 管理數據庫結構的升級和降級
用 Flask 來寫個輕博客 (9) — M(V)C_Jinja 語法基礎快速概覽
用 Flask 來寫個輕博客 (10) — M(V)C_Jinja 經常使用過濾器與 Flask 特殊變量及方法
用 Flask 來寫個輕博客 (11) — M(V)C_建立視圖函數
用 Flask 來寫個輕博客 (12) — M(V)C_編寫和繼承 Jinja 模板
用 Flask 來寫個輕博客 (13) — M(V)C_WTForms 服務端表單檢驗
用 Flask 來寫個輕博客 (14) — M(V)C_實現項目首頁的模板
用 Flask 來寫個輕博客 (15) — M(V)C_實現博文頁面評論表單
用 Flask 來寫個輕博客 (16) — MV(C)_Flask Blueprint 藍圖
用 Flask 來寫個輕博客 (17) — MV(C)_應用藍圖來重構項目
用 Flask 來寫個輕博客 (18) — 使用工廠模式來生成應用對象
用 Flask 來寫個輕博客 (19) — 以 Bcrypt 密文存儲帳戶信息與實現用戶登錄表單
用 Flask 來寫個輕博客 (20) — 實現註冊表單與應用 reCAPTCHA 來實現驗證碼
用 Flask 來寫個輕博客 (21) — 結合 reCAPTCHA 驗證碼實現用戶註冊與登陸
用 Flask 來寫個輕博客 (22) — 實現博客文章的添加和編輯頁面
用 Flask 來寫個輕博客 (23) — 應用 OAuth 來實現 Facebook 第三方登陸
用 Flask 來寫個輕博客 (24) — 使用 Flask-Login 來保護應用安全 css

擴展閱讀

Flask Principal — Flask Principal 0.4.0 documentation
Flask-Login — Flask-Login 0.4.0 documentationhtml

Flask-Principal

Flask-Principal 是一個 Flask 擴展(用戶權限框架), 框架主要部分包含身份(Identity),需求(Needs),權限(Permission),和包含身份信息的上下文環境(IdentityContext)。python

Flask 中的每個 user 都會擁有一種 Identity, 而每一種 Identity 又會被關聯到一個 Needs. Flask-Principal 提供了兩種 Needs(RoleNeed/UserNeed). Needs 本質上是一個 namedtuple(具名元組) EG. ("role", "admin"), 其定義了在這個 Identity 能作什麼事情. 也就是說 Permission 實際上是經過 Needs 來定義和初始化的, 其中 Permission 能夠是一個權限的集合.shell

這裏寫圖片描述

除此以外, Flask-Principal 是經過信號(signal)來與 Flask 應用進行交互的,知足了低耦合的理念。其定義了兩個重要的signal:數據庫

  • identity_changed:通常在用戶身份變化時發送該信號, 在用戶登陸認證成功後,經過發送 identity_changed 信號告知 Flask-Principal 用戶登陸成功, 須要對用戶的權限進行改變flask

  • identity_loaded:通常在用戶權限須要被載入時發送該信息. 一般在用戶身份改變時, 就須要載入相應的權限.安全

使用 Flask-Principal 來實現角色權限功能

添加 Role Model

並且 Role 和 User 應該是 many to many 的關係.ruby

  • models.py
users_roles = db.Table('users_roles',
    db.Column('user_id', db.String(45), db.ForeignKey('users.id')),
    db.Column('role_id', db.String(45), db.ForeignKey('roles.id')))


class User(db.Model):
    """Represents Proected users."""

    # Set the name for table
    __tablename__ = 'users'
    id = db.Column(db.String(45), primary_key=True)
    username = db.Column(db.String(255))
    password = db.Column(db.String(255))
    # one to many: User ==> Post 
    # Establish contact with Post's ForeignKey: user_id
    posts = db.relationship(
        'Post',
        backref='users',
        lazy='dynamic')

    roles = db.relationship(
        'Role',
        secondary=users_roles,
        backref=db.backref('users', lazy='dynamic'))

    def __init__(self, id, username, password):
        self.id = id
        self.username = username
        self.password = self.set_password(password)

        # Setup the default-role for user.
        default = Role.query.filter_by(name="default").one()
        self.roles.append(default)

    def __repr__(self):
        """Define the string format for instance of User."""
        return "<Model User `{}`>".format(self.username)

    def set_password(self, password):
        """Convert the password to cryptograph via flask-bcrypt"""
        return bcrypt.generate_password_hash(password)

    def check_password(self, password):
        return bcrypt.check_password_hash(self.password, password)

    def is_authenticated(self):
        """Check the user whether logged in."""

        # Check the User's instance whether Class AnonymousUserMixin's instance.
        if isinstance(self, AnonymousUserMixin):
            return False
        else:
            return True

    def is_active():
        """Check the user whether pass the activation process."""

        return True

    def is_anonymous(self):
        """Check the user's login status whether is anonymous."""

        if isinstance(self, AnonymousUserMixin):
            return True
        else:
            return False

    def get_id(self):
        """Get the user's uuid from database."""

        return unicode(self.id)


class Role(db.Model):
    """Represents Proected roles."""
    __tablename__ = 'roles'

    id = db.Column(db.String(45), primary_key=True)
    name = db.Column(db.String(255), unique=True)
    description = db.Column(db.String(255))

    def __init__(self, id, name):
        self.id = id
        self.name = name

    def __repr__(self):
        return "<Model Role `{}`>".format(self.name)

NOTE: 這樣的話咱們能夠爲 user 指定一個 role 集. 用來表明該用戶所擁有的 Identity, 這也是以後爲 user 對象綁定 Needs 所須要的前提.markdown

在 Manager shell 中手動的添加角色

  • 建立 roles 數據表
(env) jmilkfan@JmilkFan-Devstack:/opt/JmilkFan-s-Blog$ python manage.py shell
>>> db.create_all()
  • 建立新用戶
>>> from uuid import uuid4
>>> user = User(id=str(uuid4()), username='jmilkfan_2016', password="fanguiju")
>>> db.session.add(user)
>>> db.session.commit()
  • 建立新角色並與新用戶創建關聯
>>> role_admin = Role(id=str(uuid4()), name="admin")
>>> role_poster = Role(id=str(uuid4()), name="poster")
>>> role_default = Role(id=str(uuid4()), name="default")

>>> user
<Model User `jmilkfan_2016`>
>>> role_admin.users = [user]
>>> role_poster.users = [user]
>>> db.session.add(role_admin)
>>> db.session.add(role_poster)
>>> db.session.add(role_default)
>>> db.session.commit()

初始化 Flask-Principal 和 Permission

  • extensions.py
from flask.ext.principal import Principal, Permission, RoleNeed


# Create the Flask-Principal's instance
principals = Principal()

# 這裏設定了 3 種權限, 這些權限會被綁定到 Identity 以後纔會發揮做用.
# Init the role permission via RoleNeed(Need).
admin_permission = Permission(RoleNeed('admin'))
poster_permission = Permission(RoleNeed('poster'))
default_permission = Permission(RoleNeed('default'))

實現權限載入信號邏輯

  • jmilkfannsblog.__init__.py
def create_app(object_name):
    """Create the app instance via `Factory Method`"""

    app = Flask(__name__)
    # Set the config for app instance
    app.config.from_object(object_name)

    # Will be load the SQLALCHEMY_DATABASE_URL from config.py to db object
    db.init_app(app)
    # Init the Flask-Bcrypt via app object
    bcrypt.init_app(app)
    # Init the Flask-OpenID via app object
    openid.init_app(app)
    # Init the Flask-Login via app object
    login_manager.init_app(app)
    # Init the Flask-Prinicpal via app object
    principals.init_app(app)

    @identity_loaded.connect_via(app)
    def on_identity_loaded(sender, identity):
        """Change the role via add the Need object into Role. Need the access the app object. """

        # Set the identity user object
        identity.user = current_user

        # Add the UserNeed to the identity user object
        if hasattr(current_user, 'id'):
            identity.provides.add(UserNeed(current_user.id))

        # Add each role to the identity user object
        if hasattr(current_user, 'roles'):
            for role in current_user.roles:
                identity.provides.add(RoleNeed(role.name))

    # Register the Blueprint into app object
    app.register_blueprint(blog.blog_blueprint)
    app.register_blueprint(main.main_blueprint)

    return app
  • NOTE 1: 由於 identity_loaded 信號實現函數,須要訪問 app 對象, 因此直接在 __init\_\_.create_app() 中實現.cookie

  • NOTE 2: on_identity_loaded() 函數在用戶身份發生了變化, 須要重載權限的時候被調用. 首先將當前的用戶綁定到一個 Identity 的實例化對象中, 而後將該用戶 id 的 UserNeed 和該用戶所擁有的 roles 對應的 RoleNeed 綁定到該 Identity 中. 實現了將數據庫中 user 所擁有的 roles 都以 Needs 的形式綁定到其自身中.

實現身份改變信號邏輯

  • jmilkfsnsblog.controllers.main.py
from flask.ext.principal import Identity, AnonymousIdentity, identity_changed, current_app


@main_blueprint.route('/login', methods=['GET', 'POST'])
@openid.loginhandler
def login():
    """View function for login. Flask-OpenID will be receive the Authentication-information from relay party. """
...

    # Will be check the account whether rigjt.
    if form.validate_on_submit():

        # Using session to check the user's login status
        # Add the user's name to cookie.
        # session['username'] = form.username.data

        user = User.query.filter_by(username=form.username.data).one()

        # Using the Flask-Login to processing and check the login status for user
        # Remember the user's login status. 
        login_user(user, remember=form.remember.data)

        identity_changed.send(
            current_app._get_current_object(),
            identity=Identity(user.id))

        flash("You have been logged in.", category="success")
        return redirect(url_for('blog.home'))

...
  • NOTE 1: identity_changed通常在用戶的身份發生變化時發送, 因此咱們通常選擇 login()視圖函數中實現.

  • NOTE 2: identity_changed.send() 函數會將 sender: current_app._get_current_object() 當前應用對象 app 和身份對象 identity: Identity(user.id) 當前要登陸的用戶對象, 以信號的新式發送出去, 表示應用 app 對象中的 user 用戶對象的 identity 被改變了.

  • NOTE 3: 在 identity_changed 信息被髮送以後, 被裝飾器 identity_loaded.connect_via(app) 裝飾的函數 on_identity_loaded(sender, identity) 就會接受該信號, 併爲 user 綁定應有 Needs, 以此來賦予其權限.

NOTE 4: 在用戶認證經過後,Flask-Principal 會將用戶的身份(identity) 存儲在 session 中。

除了登陸的時候用戶身份會被改變, 登出也是同樣的.

@main_blueprint.route('/logout', methods=['GET', 'POST'])
def logout():
    """View function for logout."""

    # Remove the username from the cookie.
    # session.pop('username', None)

    # Using the Flask-Login to processing and check the logout status for user.
    logout_user()

    identity_changed.send(
        current_app._get_current_object(),
        identity=AnonymousIdentity())
    flash("You have been logged out.", category="success")
    return redirect(url_for('main.login'))

NOTE: 用戶登出系統後清理 session,Flask-Principal 會將用戶的身份變爲 AnonymousIdentity(匿名身份)。

實現只有文章做者才能編輯文章

  • jmilkfansblog.controllers.blog.py
@blog_blueprint.route('/edit/<string:id>', methods=['GET', 'POST'])
@login_required
@poster_permission.require(http_exception=403)
def edit_post(id):
    """View function for edit_post."""

    post = Post.query.get_or_404(id)

    # Ensure the user logged in.
    if not current_user:
        return redirect(url_for('main.login'))

    # Only the post onwer can be edit this post.
    if current_user != post.users:
        return redirect(url_for('blog.post', post_id=id))

    # 當 user 是 poster 或者 admin 時, 纔可以編輯文章
    # Admin can be edit the post.
    permission = Permission(UserNeed(post.users.id))
    if permission.can() or admin_permission.can():
        form = PostForm()

        #if current_user != post.users:
        # abort(403)

        if form.validate_on_submit():
            post.title = form.title.data
            post.text = form.text.data
            post.publish_date = datetime.now()

            # Update the post
            db.session.add(post)
            db.session.commit()

            return redirect(url_for('blog.post', post_id=post.id))
    else:
        abort(403)

    # Still retain the original content, if validate is false.
    form.title.data = post.title
    form.text.data = post.text
    return render_template('edit_post.html', form=form, post=post)

實現效果

  • 以具備 poster identity 的 jmilkfan_2016 登陸

  • 建立新的文章
    這裏寫圖片描述

  • jmilkfansblog.controllers.blog:edit_port()中打個斷點, 咱們來看看此時 permision 和 admin_permission 對象的值.

(Pdb) l
165             return redirect(url_for('blog.post', post_id=id))
166     
167         import pdb
168         pdb.set_trace()
169         # Admin can be edit the post.
170  ->     permission = Permission(UserNeed(post.users.id))
171         if permission.can() or admin_permission.can():
172             form = PostForm()
173     
174             #if current_user != post.users:
175             #    abort(403)
(Pdb) n
> /opt/JmilkFan-s-Blog/jmilkfansblog/controllers/blog.py(171)edit_post()
-> if permission.can() or admin_permission.can():
(Pdb) permission
<Permission needs=set([Need(method='id', value=u'b003f813-abfa-46d6-babc-2033b0b43f7e')]) excludes=set([])> (Pdb) permission.can() True

能夠看見 permission 對象所對應的 user id == b003f813-abfa-46d6-babc-2033b0b43f7e, 而該 user 在數據庫中對應的 roles == [87d180cc-bfa5-4c6a-87d4-01decb9c8649, 4b8b5c13-76fa-47e1-8403-623d284b2db7], 因此 user 在登陸時因爲其自身 Identity 的改變而觸發了 on_identity_loaded() 方法, 將 admin/poster 兩個 roles 對應的 RoleNeed 綁定到 user 自身的 identity 對象上, 從而擁有了編輯文章的權限.

這裏寫圖片描述

不然, 若是是匿名用戶想要編輯該文章的話就會觸發 403
這裏寫圖片描述

相關文章
相關標籤/搜索