Serverless 與 Flask 框架結合進行 Blog 開發

隨着時間的發展,Serverless 架構愈來愈火熱,其按量付費、彈性伸縮等諸多優質特性,讓人眼前一亮,不得不驚歎雲計算爲咱們帶來的便利。html

本實踐經過一個博客系統的開發,和你們簡單地體驗一下基於 Serverless 架構的博客系統是什麼樣的。前端

開發前的思考

  1. 博客系統須要哪些功能?本文僅僅是 demo 性質,因此功能比較少,只有兩個頁面。具備文章管理、分類管理、標籤管理以及留言管理等功能。同時爲了方便用戶管理,要有前臺和後臺兩部分。
  2. 前臺如何作?前臺多是用戶流量比較大的(相對後臺而言),因此這部分就是用單獨的函數。每一個功能一個函數,初步判斷前臺可能須要:獲取文章分類,獲取文章列表,獲取評論列表,增長評論,獲取標籤列表等接口。
  3. 後臺如何作?後臺理論上是管理員的專屬地盤,因此這一部分流量比較小,能夠經過 flask-admin,放入到一個函數中來解決。
  4. 爲何前臺要那麼多函數,後臺用一個框架?整個項目就用一個框架很差麼?首先要回答,整個項目用一個框架也是能夠的,可是並很差。例如這個項目的後臺,使用的是 Flask 框架,用了 flask-admin 來作後臺管理,這個開發過程很簡單,可能整個後臺就一百來行代碼就搞定了,可是這涉及到:python

    • 網頁的返回,須要 APIGW 開啓響應集成,響應集成的性能其實不好,因此相對來講,不太適合放在前端;
    • 一個完整項目比較大,可能須要的資源也會更多,那麼咱們就須要給這個函數更多的資源內存,可能會致使收費的增長,例如個人後臺給的資源是 1024,個人前端每一個函數給的內存資源是 128/256,在執行一樣時間的時候,明顯後者的費用下降了 4~8 倍。一樣,函數可能涉及大冷啓動,冷啓動一個函數和冷啓動函數中的一個完整的框架/項目,前者的速度和性能可能會更好一下;
    • 函數都有併發上限的,若是全部的資源全都請求到一個函數,那麼極可能實際用戶併發幾個的時候,對用的函數併發就多是幾十幾百,這極可能在用戶稍微多一點的狀況下,就會觸及用戶實例的上限限制,後臺功能是非頻繁功能,前臺相對來講是更頻繁的,因此前臺是用單獨接口更合理。
  5. 登錄功能怎麼作?很是抱歉,函數並不能像傳統開發,將客戶的一些登陸信息緩存到機器上,可是客戶端依舊可使用 cookie,因此利用這個方法,能夠作如下流程:mysql

    • 後臺登陸入口處,拉取 APIGW 傳過來的 APIGW Event,看其中 headers/cookie 是否存在,不存在就會返回登陸頁面;
    • 若是 headers/cookie 存在,取 cookie 中的 token 字段,判斷 token 字段是否和服務端的 token 字段吻合,吻合進入系統後臺,不吻合返回登陸頁面
    • 用戶登陸,請求後臺的登錄功能,若是帳號密碼正確,則返回給用戶一個 token,客戶端將 token 記錄到 cookie 中
  • 問題來了:git

    • token 是什麼?Token 能夠認爲是一個登陸憑證,生成方法能夠按照本身設計升級,本實踐比較簡單,就直接用帳號密碼組合,而後 md5。
    • token 存在那裏?下次如何獲取?Token 能夠存在 Mysql 數據庫中,也能夠存在 Redis 中,甚至能夠存在 COS 中,例如 Redis 和 COS,均可以利用其自身的一些特性作一些額外的操做,例如數據有效期(用來作登陸過時等)。固然本文不想作的那麼麻煩,因此每次用戶請求過來,都是單獨計算 token,而後進行的對比。
    • 這種 token 登錄方法能夠用於其餘項目麼?仍是僅適用於這種博客系統。能夠適用其餘項目,不少項目均可以經過這種方法來作,例如我本身的 Anycodes,也是經過 Token 進行鑑權,只不過在 Serverless 架構下,Token 如何存儲是一個問題,可是我我的推薦有錢就用 redis,沒錢就用 cos,不想額外花錢就像我,每次是用單獨對比。
    • token 存在 redis 能夠理解,可是存在 cos 是爲何?cos 自己是對象存儲,用來存儲文件的,其實徹底能夠用來存儲 token,例如咱們每次生成一個新的 token,都把這個 token 設置爲一個文件,文件內容就是這個 token 對應的用戶信息或者是權限信息,或者其餘的信息,而後存儲桶策略設置成文件過時時間,例如文件存入 1 天自動刪除,那麼 1 天以後,你存儲的這個 token 文件就會被刪除。等用戶帶着 token 過來的時候,直接經過內網請求 cos(沒有流量費)獲取指定文件名,若是獲取到了就下載回來(文件通常也就 1K 或者如下),而後進行其餘操做,不存在就證實用戶已過時,或者 token 錯誤,讓他從新登陸就行了。固然,這種方法可能不是最優解,可是確實是在 Serverless 條件下的一個有趣的作法。能夠在小項目中嘗試使用。
  1. 項目本地開發如何進行調試?衆所周知 Serverless 架構的本地調試很難。確實如此,雖說本地調試很困難,但也不是不能越過去的,能夠根據項目本身的需求,來作一些調試策略。

項目開發

項目開發過程主要就是數據庫的增刪改查,爲了更加適應 Serverless 架構下的項目開發,也爲了提升項目的開發效率特總結了相關的開發技巧和經驗。github

數據庫設計

因爲是作一個簡單的博客,因此數據庫相對設計比較簡單,只有文章表、分類表以及標籤表、評論表等,總體的 ER 圖以下所示:web

ER 圖

本地開發與調試

對於開發調試,我在每一個函數後面增長了對應觸發器的調試方案,例如 APIGW 觸發器,我增長了如下代碼:redis

def test():
    event = {
        "requestContext": {
            "serviceId": "service-f94sy04v",
            "path": "/test/{path}",
            "httpMethod": "POST",
            "requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef",
            "identity": {
                "secretId": "abdcdxxxxxxxsdfs"
            },
            "sourceIp": "14.17.22.34",
            "stage": "release"
        },
        "headers": {
            "Accept-Language": "en-US,en,cn",
            "Accept": "text/html,application/xml,application/json",
            "Host": "service-3ei3tii4-251000691.ap-guangzhou.apigateway.myqloud.com",
            "User-Agent": "User Agent String"
        },
        "body": json.dumps({"id": 1}),
         .... .... 
    }
    print(main_handler(event, None))


if __name__ == "__main__":
    test()

在實際上,我每次想要看一下運行效果,我都會執行這個文件:sql

{'id': 1, 'title': '', 'watched': 1, 'category': '熱點新聞', 'publish': '2020-02-13 00:45:52', 'tags': [], 'next': {}, 'pre': {}}
{'uuid': '749ca9f6-4dfb-11ea-9c5b-acde48001122', 'error': False, 'message': ''}

能夠認爲,是在經過本地模擬一些線上環境。固然,若是有 redis 等一些須要內網資源的函數,就比較麻煩,可是我這作法,能夠用於絕大部分函數。包括後臺的 Flaks 框架部分:數據庫

def test():
    event = {'body': 'name=sdsadasdsadasd&remark=', 'headerParameters': {}, 'headers': {
        'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',
        'accept-encoding': 'gzip, deflate', 'accept-language': 'zh-CN,zh;q=0.9', 'cache-control': 'no-cache',
        'connection': 'keep-alive', 'content-length': '27', 'content-type': 'application/x-www-form-urlencoded',
        'cookie': 'Hm_lvt_a0c900918361b31d762d9cf4dc81ee5b=1574491278,1575257377', 'endpoint-timeout': '15',
        'host': 'blog.0duzhan.com', 'origin': 'http://blog.0duzhan.com', 'pragma': 'no-cache',
        'proxy-connection': 'keep-alive', 'referer': 'http://blog.0duzhan.com/admin/tag/new/?url=%2Fadmin%2Ftag%2F',
        'upgrade-insecure-requests': '1',
        'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36',
        'x-anonymous-consumer': 'true', 'x-api-requestid': '656622f3b008a0d406a376809b03b52c',
        'x-b3-traceid': '656622f3b008a0d406a376809b03b52c', 'x-qualifier': '$LATEST'}, 'httpMethod': 'POST',
             'path': '/admin/tag/new/', 'pathParameters': {}, 'queryString': {'url': '/admin/tag/'},
             'queryStringParameters': {},
             'requestContext': {'httpMethod': 'ANY', 'identity': {}, 'path': '/admin', 'serviceId': 'service-23ybmuq7',
                                'sourceIp': '119.123.224.87', 'stage': 'release'}}
    print(main_handler(event, None))


if __name__ == "__main__":
    test()

index 執行結果:

{'body': 'name=sdsadasdsadasd&remark=', 'headerParameters': {}, 'headers': {'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'accept-encoding': 'gzip, deflate', 'accept-language': 'zh-CN,zh;q=0.9', 'cache-control': 'no-cache', 'connection': 'keep-alive', 'content-length': '27', 'content-type': 'application/x-www-form-urlencoded', 'cookie': 'Hm_lvt_a0c900918361b31d762d9cf4dc81ee5b=1574491278,1575257377', 'endpoint-timeout': '15', 'host': 'blog.0duzhan.com', 'origin': 'http://blog.0duzhan.com', 'pragma': 'no-cache', 'proxy-connection': 'keep-alive', 'referer': 'http://blog.0duzhan.com/admin/tag/new/?url=%2Fadmin%2Ftag%2F', 'upgrade-insecure-requests': '1', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36', 'x-anonymous-consumer': 'true', 'x-api-requestid': '656622f3b008a0d406a376809b03b52c', 'x-b3-traceid': '656622f3b008a0d406a376809b03b52c', 'x-qualifier': '$LATEST'}, 'httpMethod': 'POST', 'path': '/admin/tag/new/', 'pathParameters': {}, 'queryString': {'url': '/admin/tag/'}, 'queryStringParameters': {}, 'requestContext': {'httpMethod': 'ANY', 'identity': {}, 'path': '/admin', 'serviceId': 'service-23ybmuq7', 'sourceIp': '119.123.224.87', 'stage': 'release'}}
{'isBase64Encoded': False, 'statusCode': 200, 'headers': {'Content-Type': 'text/html'}, 'body': '<!DOCTYPE html>\n<html lang="en">\n<head>\n    <meta charset="UTF-8">\n    <title>Title</title>\n    <script>\n        var url = window.location.href\n        url = url.split("admin")[0] + "admin"\n        String.prototype.endWith = function (s) {\n            var d = this.length - s.length;\n            return (d >= 0 && this.lastIndexOf(s) == d)\n        }\n        if (window.location.href != url) {\n            if (!window.location.href.endsWith("admin") || !window.location.href.endsWith("admin/"))\n                window.location = url\n        }\n\n        function doLogin() {\n            var xmlhttp = window.XMLHttpRequest ? (new XMLHttpRequest()) : (new ActiveXObject("Microsoft.XMLHTTP"))\n            xmlhttp.onreadystatechange = function () {\n                if (xmlhttp.readyState == 4 && xmlhttp.status == 200) {\n                    if (JSON.parse(xmlhttp.responseText)["token"]) {\n                        document.cookie = "token=" + JSON.parse(xmlhttp.responseText)["token"];\n                        window.location = `http://${window.location.host}/admin`\n                    } else {\n                        alert(JSON.parse(xmlhttp.responseText)["message"])\n                    }\n                }\n            }\n            xmlhttp.open("POST", window.location.pathname, true);\n            xmlhttp.setRequestHeader("Content-type", "application/json");\n            xmlhttp.send(JSON.stringify({\n                "username": document.getElementById("username").value,\n                "password": document.getElementById("password").value,\n            }));\n        }\n    </script>\n</head>\n<body>\n\n<center><h1>Serverless Blog 後臺管理</h1>\n    管理帳號:<input type="text" id="username"><br>\n    管理密碼:<input type="password" id="password"><br>\n    <input type="reset"><input type="submit" onclick="doLogin()"><br>\n</center>\n</body>\n</html>'}

Flask部署

Flask 部署到 Serverless 架構能夠用 @serverless/tencent-flask,可是這裏爲了更加深刻了解傳統框架如何部署到 Serverless 架構,因此此處自行「造輪子」實現,先來看一張圖:

在一般狀況下,咱們使用 Flask 等框架實際上要經過 web_server,進入到下一個環節,而咱們雲函數更可能是一個函數,本不須要啓動 web server,因此咱們就能夠直接調用 wsgi_app 這個方法,其中這裏的 environ 就是咱們剛纔的經過對 event/context 等進行處理後的對象,start_response 能夠認爲是咱們的一種特殊的數據結構,例如咱們的 response 結構形態等。因此,若是咱們本身想要實現這個過程,不使用騰訊雲 flask-component,能夠這樣作:

# -*- coding: utf-8 -*-
# Copyright 2016 Matt Martz
# All Rights Reserved.
#
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
#    not use this file except in compliance with the License. You may obtain
#    a copy of the License at
#
#         http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
#    License for the specific language governing permissions and limitations
#    under the License.

import sys
import json

try:
    from urllib import urlencode
except ImportError:
    from urllib.parse import urlencode

from flask import Flask

try:
    from cStringIO import StringIO
except ImportError:
    try:
        from StringIO import StringIO
    except ImportError:
        from io import StringIO

from werkzeug.wrappers import BaseRequest

__version__ = '0.0.4'


def make_environ(event):
    environ = {}

    for hdr_name, hdr_value in event['headers'].items():
        hdr_name = hdr_name.replace('-', '_').upper()
        if hdr_name in ['CONTENT_TYPE', 'CONTENT_LENGTH']:
            environ[hdr_name] = hdr_value
            continue

        http_hdr_name = 'HTTP_%s' % hdr_name
        environ[http_hdr_name] = hdr_value

    apigateway_qs = event['queryStringParameters']
    request_qs = event['queryString']
    qs = apigateway_qs.copy()
    qs.update(request_qs)

    body = ''
    if 'body' in event:
        body = event['body']

    environ['REQUEST_METHOD'] = event['httpMethod']
    environ['PATH_INFO'] = event['path']
    environ['QUERY_STRING'] = urlencode(qs) if qs else ''
    environ['REMOTE_ADDR'] = 80
    environ['HOST'] = event['headers']['host']
    environ['SCRIPT_NAME'] = ''

    environ['SERVER_PORT'] = 80
    environ['SERVER_PROTOCOL'] = 'HTTP/1.1'

    environ['CONTENT_LENGTH'] = str(len(body))

    environ['wsgi.url_scheme'] = ''
    environ['wsgi.input'] = StringIO(body)
    environ['wsgi.version'] = (1, 0)
    environ['wsgi.errors'] = sys.stderr
    environ['wsgi.multithread'] = False
    environ['wsgi.run_once'] = True
    environ['wsgi.multiprocess'] = False

    BaseRequest(environ)

    return environ


class LambdaResponse(object):
    def __init__(self):
        self.status = None
        self.response_headers = None

    def start_response(self, status, response_headers, exc_info=None):
        self.status = int(status[:3])
        self.response_headers = dict(response_headers)


class FlaskLambda(Flask):
    def __call__(self, event, context):
        if 'httpMethod' not in event:
            print('httpMethod not in event')
            # In this "context" `event` is `environ` and
            # `context` is `start_response`, meaning the request didn't
            # occur via API Gateway and Lambda
            return super(FlaskLambda, self).__call__(event, context)

        response = LambdaResponse()
        # print response.start_response

        body = next(self.wsgi_app(
            make_environ(event),
            response.start_response
        ))

        # return {
        # "isBase64Encoded": False,
        # "statusCode": 200,
        # "headers": {'Content-Type': 'text/html'},
        # "body": body
        # }

        return {
            'statusCode': response.status,
            'headers': response.response_headers,
            'body': body
        }

這個代碼,能夠將 APIGW 過來的請求,變成請求集成的形式,傳送給 Flask 框架,用戶能夠經過 request.form 來獲取 post 內容,經過 request.args 獲取 get 內容等。

全局變量

全局變量可能包括用戶帳號,密碼,雲的密鑰信息,數據庫信息等,爲了統一配置和修改,可使用我本身寫的全局變量組件:

# 函數們的總體配置信息
Conf:
  component: "serverless-global"
  inputs:
    region: ap-shanghai
    runtime: Python3.6
    handler: index.main_handler
    include_common: ./common
    blog_user: Dfounder
    blog_email: service@anycodes.cn
    blog_about_me: 這就是個人博客
    blog_host: blog.0duzhan.com
    website_title: Serverless Blog System
    website_keywords: Serverless, Serverless Framework, Tencent Cloud, SCF
    website_description: 一款基於騰訊雲Serverless架構,而且採用Serverless Framework構建的Serverless博客系統。
    website_bucket: serverless-blog-1256773370
    mysql_host: 
    mysql_user: root
    mysql_password: 
    mysql_port: 60510
    mysql_db: serverless_blog_system
    admin_user: mytest
    admin_password: mytestabc
    tencent_secret_id: 
    tencent_secret_key: 
    tencent_appid:

在使用的時候,能夠直接用,例如函數:

Blog_Web_addComment:
  component: "@serverless/tencent-scf"
  inputs:
    name: Blog_Web_addComment
    description: 添加評論
    codeUri: ./cloudFunctions/addComment
    handler: ${Conf.handler}
    runtime: ${Conf.runtime}
    region:  ${Conf.region}
    include:
      - ${Conf.include_common}
    environment:
      variables:
        mysql_host: ${Conf.mysql_host}
        mysql_port: ${Conf.mysql_port}
        mysql_user: ${Conf.mysql_user}
        mysql_password: ${Conf.mysql_password}
        mysql_db: ${Conf.mysql_db}

項目初始化

爲了讓項目更容易初始化,例如我修改網站的名字,描述,關鍵詞,或者我須要創建數據庫等。因此這個時候我單獨作了一個 init 文件:

# -*- coding: utf8 -*-
import pymysql
import shutil
import yaml
import os


def setEnv():
    try:
        file = open("./serverless.yaml", 'r', encoding="utf-8")
        file_data = file.read()
        file.close()

        data = yaml.load(file_data)
        for eveKey, eveValue in data['Conf']['inputs'].items():
            os.environ[eveKey] = str(eveValue)
        return True
    except Exception as e:
        raise e


def initDb():
    try:
        conn = pymysql.connect(host=os.environ.get('mysql_host'),
                               user=os.environ.get('mysql_user'),
                               password=os.environ.get('mysql_password'),
                               port=int(os.environ.get('mysql_port')),
                               charset='utf8')
        cursor = conn.cursor()
        sql = "CREATE DATABASE IF NOT EXISTS {db_name}".format(db_name=os.environ.get('mysql_db'))
        cursor.execute(sql)
        cursor.close()
        conn.close()
        return True
    except Exception as e:
        raise e


def initTable():
    try:
        conn = pymysql.connect(host=os.environ.get('mysql_host'),
                               user=os.environ.get('mysql_user'),
                               password=os.environ.get('mysql_password'),
                               port=int(os.environ.get('mysql_port')),
                               db=os.environ.get('mysql_db'),
                               charset='utf8',
                               cursorclass=pymysql.cursors.DictCursor,
                               autocommit=1)
        cursor = conn.cursor()
        createTags = "CREATE TABLE `tags` ( `tid` INT NOT NULL AUTO_INCREMENT , `name` VARCHAR(255) NOT NULL , `remark` TEXT NULL , PRIMARY KEY (`tid`), UNIQUE (`name`)) ENGINE = InnoDB;"
        createCategory = "CREATE TABLE `category` ( `cid` INT NOT NULL AUTO_INCREMENT , `name` VARCHAR(255) NOT NULL , `sorted` INT NOT NULL DEFAULT '1' , `remark` TEXT NULL , PRIMARY KEY (`cid`), UNIQUE (`name`)) ENGINE = InnoDB;"
        createComments = "CREATE TABLE `comments` ( `cid` INT NOT NULL AUTO_INCREMENT , `content` TEXT NOT NULL , `publish` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP , `user` VARCHAR(255) NOT NULL , `email` VARCHAR(255) NULL , `photo` INT NOT NULL DEFAULT '0' ,  `article` INT NOT NULL , `remark` TEXT NULL , `uni_mark` VARCHAR(255) NOT NULL , `is_show` INT NOT NULL DEFAULT '0' , PRIMARY KEY (`cid`), UNIQUE (`uni_mark`)) ENGINE = InnoDB;"
        createArticle = "CREATE TABLE `article` ( `aid` INT NOT NULL AUTO_INCREMENT , `title` VARCHAR(255) NOT NULL , `content` TEXT NOT NULL , `description` TEXT NOT NULL , `publish` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP , `watched` INT NOT NULL DEFAULT '0' , `category` INT NOT NULL , `remark` TEXT NULL , PRIMARY KEY (`aid`)) ENGINE = InnoDB;"
        createArticleTags = "CREATE TABLE `article_tags` ( `atid` INT NOT NULL AUTO_INCREMENT , `aid` INT NOT NULL , `tid` INT NOT NULL , PRIMARY KEY (`atid`)) ENGINE = InnoDB;"
        alertArticleTagsArticle = "ALTER TABLE `article_tags` ADD CONSTRAINT `article` FOREIGN KEY (`aid`) REFERENCES `article`(`aid`) ON DELETE CASCADE ON UPDATE CASCADE; "
        alertArticleTagsTags = "ALTER TABLE `article_tags` ADD CONSTRAINT `tags` FOREIGN KEY (`tid`) REFERENCES `tags`(`tid`) ON DELETE CASCADE ON UPDATE CASCADE;"
        alertArticleCategory = "ALTER TABLE `article` ADD CONSTRAINT `category` FOREIGN KEY (`category`) REFERENCES `category`(`cid`) ON DELETE CASCADE ON UPDATE CASCADE;"
        alertCommentsArticle = "ALTER TABLE `comments` ADD CONSTRAINT `article_comments` FOREIGN KEY (`article`) REFERENCES `article`(`aid`) ON DELETE CASCADE ON UPDATE CASCADE;"
        cursor.execute(createTags)
        cursor.execute(createCategory)
        cursor.execute(createComments)
        cursor.execute(createArticle)
        cursor.execute(createArticleTags)
        cursor.execute(alertArticleTagsArticle)
        cursor.execute(alertArticleTagsTags)
        cursor.execute(alertArticleCategory)
        cursor.execute(alertCommentsArticle)
        cursor.close()
        conn.close()
        return True
    except Exception as e:
        raise e


def initHTML():
    try:
        tempPath = "website"
        tempDist = os.path.join(tempPath, "dist")
        if os.path.exists(tempDist):
            shutil.rmtree(tempDist)
        tempFileList = []
        for eve in os.walk(tempPath):
            if eve[2]:
                for eveFile in eve[2]:
                    tempFileList.append(os.path.join(eve[0], eveFile))
        os.mkdir(tempDist)
        for eve in tempFileList:
            temp = os.path.split(eve.replace(tempPath, tempDist))
            if not os.path.exists(temp[0]):
                os.makedirs(temp[0])
            if eve.endswith(".html") or eve.endswith(".htm"):
                with open(eve) as readData:
                    with open(eve.replace(tempPath, tempDist), "w") as writeData:
                        writeData.write(readData.read().
                                        replace('{{ user }}', os.environ.get('blog_user')).
                                        replace('{{ email }}', os.environ.get('blog_email')).
                                        replace('{{ title }}', os.environ.get('website_title')).
                                        replace('{{ keywords }}', os.environ.get('website_keywords')).
                                        replace('{{ about_me }}', os.environ.get('blog_about_me')).
                                        replace('{{ host }}', os.environ.get('blog_host')).
                                        replace('{{ description }}', os.environ.get('website_description')))
            else:
                shutil.copy(eve, eve.replace(tempPath, tempDist))
        return True
    except Exception as e:
        raise e


if __name__ == "__main__":
    print("獲取Yaml數據: ", setEnv())
    print("創建數據庫:", initDb())
    print("創建數據庫:", initTable())
    print("初始化HTML:", initHTML())

公共組件的開發

在項目中會有不少公共組件,例如數據庫的部分,因此我把數據庫的代碼,統一放到了一塊兒:common/mysqlCommon.py:

# -*- coding: utf8 -*-

import os
import re
import pymysql
import hashlib
from random import choice


class mysqlCommon:
    def __init__(self):
        self.getConnection({
            "host": os.environ.get('mysql_host'),
            "user": os.environ.get('mysql_user'),
            "port": int(os.environ.get('mysql_port')),
            "db": os.environ.get('mysql_db'),
            "password": os.environ.get('mysql_password')
        })

    def getDefaultPic(self):
        return choice([
            'http://t8.baidu.com/it/u=1484500186,1503043093&fm=79&app=86&f=JPEG?w=1280&h=853',
            'http://t8.baidu.com/it/u=2247852322,986532796&fm=79&app=86&f=JPEG?w=1280&h=853',
            'http://t7.baidu.com/it/u=3204887199,3790688592&fm=79&app=86&f=JPEG?w=4610&h=2968',
            'http://t9.baidu.com/it/u=3363001160,1163944807&fm=79&app=86&f=JPEG?w=1280&h=830',
            'http://t9.baidu.com/it/u=583874135,70653437&fm=79&app=86&f=JPEG?w=3607&h=2408',
            'http://t9.baidu.com/it/u=583874135,70653437&fm=79&app=86&f=JPEG?w=3607&h=2408',
            'http://t9.baidu.com/it/u=1307125826,3433407105&fm=79&app=86&f=JPEG?w=5760&h=3240',
            'http://t9.baidu.com/it/u=2268908537,2815455140&fm=79&app=86&f=JPEG?w=1280&h=719',
            'http://t7.baidu.com/it/u=1179872664,290201490&fm=79&app=86&f=JPEG?w=1280&h=854',
            'http://t9.baidu.com/it/u=3949188917,63856583&fm=79&app=86&f=JPEG?w=1280&h=875',
            'http://t9.baidu.com/it/u=2266751744,4253267866&fm=79&app=86&f=JPEG?w=1280&h=854',
            'http://t8.baidu.com/it/u=4100756023,1345858297&fm=79&app=86&f=JPEG?w=1280&h=854',
            'http://t7.baidu.com/it/u=1355385882,1155324943&fm=79&app=86&f=JPEG?w=1280&h=854',
            'http://t9.baidu.com/it/u=2292037961,3689236171&fm=79&app=86&f=JPEG?w=1280&h=854',
            'http://t9.baidu.com/it/u=4241966675,2405819829&fm=79&app=86&f=JPEG?w=1280&h=854',
            'http://t8.baidu.com/it/u=2857883419,1187496708&fm=79&app=86&f=JPEG?w=1280&h=763',
            'http://t8.baidu.com/it/u=198337120,441348595&fm=79&app=86&f=JPEG?w=1280&h=732'
        ])

    def getConnection(self, conf):
        self.connection = pymysql.connect(host=conf['host'],
                                          user=conf['user'],
                                          password=conf['password'],
                                          port=int(conf['port']),
                                          db=conf['db'],
                                          charset='utf8',
                                          cursorclass=pymysql.cursors.DictCursor,
                                          autocommit=1)

    def doAction(self, stmt, data):
        try:
            self.connection.ping(reconnect=True)
            cursor = self.connection.cursor()
            cursor.execute(stmt, data)
            result = cursor
            cursor.close()
            return result
        except Exception as e:
            print(e)
            try:
                cursor.close()
            except:
                pass
            return False

    def getCategoryList(self):
        search_stmt = (
            "SELECT * FROM `category` ORDER BY `sorted`"
        )
        result = self.doAction(search_stmt, ())
        if result == False:
            return False
        return [{"id": eveCategory['cid'], "name": eveCategory['name']} for eveCategory in result.fetchall()]

    def getArticleList(self, category, tag, page=1):
        if category:
            search_stmt = (
                "SELECT article.*,category.name FROM `article` LEFT JOIN `category` ON article.category=category.cid WHERE article.category=%s ORDER BY -article.aid LIMIT %s,%s;"
            )
            count_stmt = (
                "SELECT COUNT(*) FROM `article` LEFT JOIN `category` ON article.category=category.cid WHERE article.category=%s;"
            )
            data = (category, 10 * (int(page) - 1), 10 * int(page))
            count_data = (category,)
        elif tag:
            search_stmt = (
                "SELECT article.* FROM `article` LEFT JOIN `article_tags` ON article.aid=article_tags.aid WHERE article_tags.tid=%s ORDER BY -article.aid LIMIT %s,%s;"
            )
            count_stmt = (
                "SELECT COUNT(*) FROM `article`LEFT JOIN `article_tags` ON article.aid=article_tags.aid WHERE article_tags.tid=%s;"
            )
            data = (tag, 10 * (int(page) - 1), 10 * int(page))
            count_data = (tag,)
        else:
            search_stmt = (
                "SELECT article.*,category.name FROM `article` LEFT JOIN `category` ON article.category=category.cid ORDER BY -article.aid LIMIT %s,%s;"
            )
            count_stmt = (
                "SELECT COUNT(*) FROM `article` LEFT JOIN `category` ON article.category=category.cid; "
            )
            data = (10 * (int(page) - 1), 10 * int(page))
            count_data = ()
        result = self.doAction(search_stmt, data)
        if result == False:
            return False

        return {"data": [{"id": eveArticle['aid'],
                          "title": eveArticle['title'],
                          "description": eveArticle['description'],
                          "watched": eveArticle['watched'],
                          "category": eveArticle['category'],
                          "publish": str(eveArticle['publish']),
                          "picture": self.getPicture(eveArticle['content'])}
                         for eveArticle in result.fetchall()],
                "count": self.doAction(count_stmt, count_data).fetchone()["COUNT(*)"]}

    def getHotArticleList(self):
        search_stmt = (
            "SELECT article.*,category.name FROM `article` LEFT JOIN `category` ON article.category=category.cid ORDER BY article.watched LIMIT 0,5"
        )
        result = self.doAction(search_stmt, ())
        if result == False:
            return False
        return [{"id": eveArticle['aid'],
                 "title": eveArticle['title'],
                 "description": eveArticle['description'],
                 "watched": eveArticle['watched'],
                 "category": eveArticle['category'],
                 "publish": str(eveArticle['publish']),
                 "picture": self.getPicture(eveArticle['content'])}
                for
                eveArticle in result.fetchall()]

    def getTagsArticle(self, aid):
        search_stmt = (
            "SELECT tags.name, tags.tid FROM `article_tags` LEFT JOIN `tags` ON article_tags.tid=tags.tid WHERE article_tags.aid=%s;"
        )
        result = self.doAction(search_stmt, (aid,))
        if result == False:
            return False
        return [{"id": eveTag["tid"], "name": eveTag["name"]} for eveTag in result.fetchall()]

    def getTagsList(self):
        search_stmt = (
            "SELECT * FROM tags ORDER BY RAND() LIMIT 20; "
        )
        result = self.doAction(search_stmt, ())
        if result == False:
            return False
        return [{"id": eveTag['tid'], "name": eveTag['name']} for eveTag in result.fetchall()]

    def getArticleContent(self, aid):
        search_stmt = (
            "SELECT article.*, category.name FROM `category` LEFT JOIN `article` ON category.cid=article.category WHERE article.aid=%s;"
        )
        result = self.doAction(search_stmt, (aid))
        if result == False:
            return False
        article = result.fetchone()
        return {
            "id": article["aid"],
            "title": article["title"],
            "content": article["content"],
            "description": article["description"],
            "watched": article["watched"],
            "category": article["name"],
            "publish": str(article["publish"]),
            "tags": self.getTagsArticle(article["aid"]),
            "next": self.getOtherArticle(aid, "next"),
            "pre": self.getOtherArticle(aid, "pre")
        } if article else {}

    def getOtherArticle(self, aid, articleType):
        search_stmt = (
            "SELECT * FROM `article` WHERE aid=(select max(aid) from `article` where aid>%s)"
        ) if articleType == "next" else (
            "SELECT * FROM `article` WHERE aid=(select max(aid) from `article` where aid<%s)"
        )
        result = self.doAction(search_stmt, (aid))
        if result == False:
            return False
        article = result.fetchone()
        return {
            "id": article["aid"],
            "title": article["title"]
        } if article else {}

    def getComments(self, aid):
        search_stmt = (
            "SELECT * FROM `comments` WHERE article=%s AND is_show=1 ORDER BY -cid LIMIT 100;"
        )
        result = self.doAction(search_stmt, (aid))
        if result == False:
            return False
        return [{"content": eveComment['content'],
                 "publish": str(eveComment['publish']),
                 "user": eveComment['user'],
                 "remark": eveComment['remark']} for eveComment in result.fetchall()]

    def addComment(self, content, user, email, aid):
        insert_stmt = (
            "INSERT INTO `comments` (`cid`, `content`, `publish`, `user`, `email`, `article`, `uni_mark`) "
            "VALUES (NULL, %s, CURRENT_TIMESTAMP, %s, %s, %s, %s)"
        )
        result = self.doAction(insert_stmt, (content, user, email, aid, hashlib.md5(
            ("%s----%s----%s----%s" % (str(content), str(user), str(email), str(aid))).encode("utf-8")).hexdigest()))
        return False if result == False else True

    def updateArticleWatched(self, wid):
        update_stmt = (
            "UPDATE `article` SET `watched`=`watched`+1 WHERE `aid` = %s"
        )
        return False if self.doAction(update_stmt, (wid)) == False else True

    def getPicture(self, content):
        resultList =[eve[1] for eve in re.findall('<img(.*?)src="(.*?)"(.*?)>', content)]
        return resultList[0] if resultList else self.getDefaultPic()


    def getTag(self, tag):
        search_stmt = (
            "SELECT * FROM `tags` WHERE name=%s;"
        )
        result = self.doAction(search_stmt, (tag,))
        return False if not result or result.rowcount == 0 else result.fetchone()['tid']

    def addTag(self, tag):
        insert_stmt = (
            "INSERT INTO `tags` (`tid`, `name`, `remark`) "
            "VALUES (NULL, %s, NULL)"
        )
        result = self.doAction(insert_stmt, (tag))
        return False if result == False else result.lastrowid

    def addArticleTag(self, article, tag):
        insert_stmt = (
            "INSERT INTO `article_tags` (`atid`, `aid`, `tid`) "
            "VALUES (NULL, %s, %s)"
        )
        result = self.doAction(insert_stmt, (article, tag))
        return False if result == False else True

這裏基本上是,這個項目須要的數據庫增刪改查的所有功能(admin 除外),在使用的時候,分爲本地和線上:

try:
    import returnCommon
    from mysqlCommon import mysqlCommon
except:
    import common.testCommon

    common.testCommon.setEnv()

    import common.returnCommon as returnCommon
    from common.mysqlCommon import mysqlCommon

mysql = mysqlCommon()

經過 python 的異常,若是導入沒找到,那就說明是本地測試,若是 from mysqlCommon import mysqlCommon 找到了,那就說明是線上環境。除了數據庫的公共組件,我還有 returnCommon 等公共文件。固然, 這些文件,在使用的時候也須要打包進入,能夠在 yaml 中增長 include,例如:

Blog_Web_addComment:
  component: "@serverless/tencent-scf"
  inputs:
    name: Blog_Web_addComment
    description: 添加評論
    codeUri: ./cloudFunctions/addComment
    handler: ${Conf.handler}
    runtime: ${Conf.runtime}
    region:  ${Conf.region}
    include:
      - ${Conf.include_common}

功能展現

前臺功能

  • 列表頁

列表頁

  • 內容頁

內容頁

後臺功能

  • 登陸功能

登陸功能

  • 列表頁

列表頁

  • 表單頁

表單頁

項目部署

  • 配置 serverless.yaml
# 函數們的總體配置信息
Conf:
  component: "serverless-global"
  inputs:
    region: ap-shanghai
    runtime: Python3.6
    handler: index.main_handler
    include_common: ./common
    blog_user: Dfounder
    blog_email: service@anycodes.cn
    website_title: Serverless Blog System
    website_keywords: Serverless, Serverless Framework, Tencent Cloud, SCF
    website_description: 一款基於騰訊雲Serverless架構,而且採用Serverless Framework構建的Serverless博客系統。
    website_bucket: serverless-blog-1256773370
    mysql_host: 
    mysql_password: 
    mysql_port: 
    mysql_db: 
    admin_user: mytest
    admin_password: mytest

除了上面的內容,還要看一下域名問題(例如 CosBucket):

# 網站
CosBucket:
  component: '@serverless/tencent-website'
  inputs:
    code:
      root: website/dist
      src: ./
      index: list.html
    region:  ${Conf.region}
    bucketName: ${Conf.website_bucket}
    hosts:
      - host: 0duzhan.com
        https:
          certId: awPsOIHY
          forceSwitch: -1
      - host: www.0duzhan.com
        https:
          certId: awPsOIHY
          forceSwitch: -1

    env:
      apiUrl: ${APIService.subDomain}

以及 API 網關內容:

# 建立 API 網關 Service
APIService:
  component: "@serverless/tencent-apigateway"
  inputs:
    region: ${Conf.region}
    customDomain:
      - domain: api.0duzhan.com
        isDefaultMapping: 'FALSE'
        pathMappingSet:
          - path: /
            environment: release
        protocols:
          - http
    protocols:
      - http
      - https
    ........

這兩部分域名能夠修改爲本身的,或者刪除掉這兩個 key

  • 執行init.py:

這裏要注意,我是在 macOS 下開發的,init.py 能夠在 macOS/Linux 運行,Windows 用戶可能要適當修改一下。還有這裏面須要一個依賴:pyyaml,須要自行安裝一下。

獲取Yaml數據:  True
創建數據庫: True
創建數據庫: True
初始化HTML: True
  • 部署資源,執行 serverless --debug
(venv) ServerlessBlog:ServerlessBlog dfounderliu$ sls --debug

  DEBUG ─ Resolving the template's static variables.
  DEBUG ─ Collecting components from the template.
  DEBUG ─ Downloading any NPM components found in the template.
  DEBUG ─ Analyzing the template's components dependencies.
  DEBUG ─ Creating the template's components graph.
  DEBUG ─ Syncing template state.
  DEBUG ─ Executing the template's components graph.
  DEBUG ─ Preparing website Tencent COS bucket serverless-blog-1256773370.
  DEBUG ─ Starting API-Gateway deployment with name APIService in the ap-shanghai region
  DEBUG ─ Using last time deploy service id service-23ybmuq7
  DEBUG ─ Updating service with serviceId service-23ybmuq7.
  DEBUG ─ Bucket "serverless-blog-1256773370" in the "ap-shanghai" region alrea
  
  ………………
  
     - 
        path:   /web/article/watched/update
        method: POST
        apiId:  api-gnvnrbyk
      - 
        path:   /web/sentence/get
        method: POST
        apiId:  api-msvadsau
      - 
        path:   /web/article/list/hot/get
        method: POST
        apiId:  api-kfkrjhim
      - 
        path:   /web/tags/list/get
        method: POST
        apiId:  api-avydagem
      - 
        path:   /admin
        method: ANY
        apiId:  api-4tnz5tc4

  176s › APIService › done

項目總結

傳統博客已經有不少了,不管是基於 PHP 的 zblog 仍是 wp 等開源項目,均可以幫助咱們快速搭建一個博客系統。除了這些博客系統以外,還有不少靜態博客系統。可是就目前而言,基於 Serverless 架構的博客系統仍是比較少見的。

本文經過原生的 Serverless 項目開發與 Flask 框架的部署上 Serverless 實現了一個基於 Python 語言的博客系統。經過該博客系統,用戶能夠發佈文章,自動撰寫文章的關鍵詞和摘要,還能夠進行留言評論的管理。固然,這個博客系統僅做爲工程實踐使用,實際上仍是有一些設計不合理的地方,可是我相信,隨着時間的發展,Serverless 架構愈來愈成熟,基於 Serverless 的開源 Blog 項目或 CMS 項目也會愈來愈多,期待那一天的到來!

Serverless Framework 30 天試用計劃

咱們誠邀您來體驗最便捷的 Serverless 開發和部署方式。在試用期內,相關聯的產品及服務均提供免費資源和專業的技術支持,幫助您的業務快速、便捷地實現 Serverless!

詳情可查閱: Serverless Framework 試用計劃

One More Thing

3 秒你能作什麼?喝一口水,看一封郵件,仍是 —— 部署一個完整的 Serverless 應用?

複製連接至 PC 瀏覽器訪問: https://serverless.cloud.tenc...

3 秒極速部署,當即體驗史上最快的 Serverless HTTP 實戰開發!

傳送門:

歡迎訪問:Serverless 中文網,您能夠在 最佳實踐 裏體驗更多關於 Serverless 應用的開發!


推薦閱讀: 《Serverless 架構:從原理、設計到項目實戰》
相關文章
相關標籤/搜索