利用原生Serverless与Flask框架开发一个博客系统

2020 年 7 月 17 日

利用原生Serverless与Flask框架开发一个博客系统

随着 Serverless 架构越来越火热,其按量付费,弹性伸缩等很多优质特性都让人眼前一亮,不得惊叹云计算为我们带来的便利。但是就目前而言,Serverless 架构相关的业务框架还是比较少的。虽然腾讯云 Serverless 与 Serverless Framework 联手,支持了 Express、Koa、Egg 以及 Flask 等众多项目的轻松上云,但是在实际使用过程中,尤其是迁移过程中还是很痛苦的。而且,从这些框架在 Serverless 组件上的表现来看,POST/GET 的参数传输方法就比较难原生获取,这可能导致原有项目上云要经历很大的演进过程。


不仅是原生框架直接部署在 Serverless 架构上会有困难,直接在 Serverless 架构上开发的框架也很少。所以,本文希望通过一个简单的博客系统开发实践,让大家体验一个基于 Serverless 架构的系统应该是什么样子。


开发前的思考


思考


  • 博客系统需要哪些功能?


这个博客系统仅作为抛砖引玉项目,所以功能比较少,只有两个页面,具有文章管理、分类管理、标签管理以及留言管理等功能,但是为了方便用户管理,要有前台和后台两部分。


  • 前台如何做?


前台的用户流量可能会比较大(相对后台而言),所以这部分使用单独的函数,每个功能一个函数,初步判断前台需要的函数包括:获取文章分类、获取文章列表、获取评论列表、增加评论、获取标签列表等接口。


  • 后台如何做?


理论上来说,后台是管理员的专属地盘,所以这部分流量比较小,可以通过 flask-admin 放入到一个函数中来解决。


  • 为什么前台要那么多函数,后台用一个框架?


整个项目就用一个框架不好吗?首先要申明,整个项目用一个框架是可以的,但是并不好,例如这个项目的后台使用的是 Flask 框架,用了 Flask-admin 来做后台管理,开发过程可能很简单,一百行代码就可以搞定,但是这样会引发其中问题。


网页的返回,需要 APIGW 开启响应集成,响应集成的性能其实很差,所以相对来说,不太适合放在前端。


一个完整的项目比较大,需要的资源也会更多,那么就需要分配给这个函数更多的资源内存,会导致收费的增加。例如后台给的资源是 1024,前端每个函数给的内存资源是 128/256,在执行同样时间的时候,明显后者的费用降低了 4-8 倍。同样,函数会涉及到冷启动,冷启动一个函数和冷启动函数中的一个完整的框架/项目,前者的速度和性能会更好。


函数都有并发上限,如果所有资源全都请求到一个函数,那么就会出现实际用户并发几个的时候,对应的函数并发就已经是达到几十上百。在用户稍微多一点的情况下,就会触及用户实例的上限限制。后台功能是非频繁功能,前台相对来说是更频繁的,所以前台使用单独接口更合理。


  • 登陆功能怎么做?


函数并不像传统开发可以将客户的登录信息缓存到机器上,但是客户端依旧可以使用 cookie,所以利用这个方法,可以做以下流程:


1、后台登录入口处,拉取 APIGW 传过来的 APIGW Event,看其中 headers/cookie 是否存在,不存在就会返回登录页面


2、如果 headers/cookie 存在,取 cookie 中的 Token 字段,判断 Token 字段是否和服务端的 Token 字段吻合,吻合进入系统后台,不吻合返回登录页面


3、用户登录,请求后台的登陆功能,如果账号密码正确,则返回给用户一个 Token,客户端将 Token 记录到 cookie 中。


看完这个流程,大家可能又会有一些问题:


Token 是什么?


Token 可以认为是一个登录凭证,生成方法可以按照自己设计升级,本实践比较简单,就直接用账号密码组合,然后 md5。


Token 存在那里?下次如何获取?


Token 可以存在 MySQL 数据库中,也可以存在 Redis 中,甚至可以存在 COS 中。Redis 和 COS 都可以利用其自身特性做一些额外的操作,例如数据有效期(用来做登录过期等)。本文实践比较简单,所以每次用户请求过来都是单独计算 Token,然后进行对比。


这种 Token 登陆方法可以用于其他项目吗?


当然可以用于其他项目,例如我自己的 Anycodes 也是通过 Token 进行鉴权。不过,在 Serverless 架构下 Token 如何存储是一个问题,我个人的建议是有钱就用 Redis,没钱就用 COS,不想额外花钱就像我,每次是用单独对比。


Token 存储在 Redis 可以理解,但是存在 COS 是为什么?


COS 本身是对象存储,用来存储文件的,所以存储 Token 是完全没有问题的。如果我们每次生成一个新的 Token,就把这个 Token 设置为一个文件,文件内容就是 Token 对应的用户信息、权限信息或者其他信息,然后存储桶策略设置成文件过期时间,例如文件存入 1 天自动删除。当用户 Token 到达时,直接通过内网请求 COS(没有流量费)获取指定文件名,如果获取到了就下载回来,然后进行其他操作,不存在就证明用户已过期或者 Token 错误,提示用户重新登录。当然,这种方法可能不是最优解,但是在 Serverless 条件下的一个有趣做法,可以在小项目中尝试使用。


  • 项目本地开发如何进行调试?


众所周知,Serverless 架构的本地调试很难。确实如此,虽说本地调试很困难,但也不是完全不能做的,可以根据项目的需求来做调试策略。


项目开发


数据库设计


由于这是一个简单的博客,所以数据库的设计也相对比较简单,只有文章表、分类表、标签表、评论表等,整体的 ER 图如下所示:



项目开发


项目开发过程主要是数据库的增删改查,为了适应 Serverless 架构下的项目开发、提高项目的开发效率,这里总结了一些相关的开发技巧和经验:


本地开发与调试


针对开发调试,我们在每个函数后都增加了对应触发器的调试方案,例如 APIGW 触发器。


增加代码:


def test():    event = {        "requestContext": {            "serviceId": "service-f94sy04v",            "path": "/test/{path}",            "httpMethod": "POST",            "requestId": "c6af9ac6-7b61-11e6-9a41-93e8deadbeef",            "identity": {                "secretId": "abdcdxxxxxxxsdfs"            },            "sourceIp": "14.17.22.34",            "stage": "release"        },        "headers": {            "Accept-Language": "en-US,en,cn",            "Accept": "text/html,application/xml,application/json",            "Host": "service-3ei3tii4-251000691.ap-guangzhou.apigateway.myqloud.com",            "User-Agent": "User Agent String"        },        "body": json.dumps({"id": 1}),         .... ....     }    print(main_handler(event, None))

if __name__ == "__main__": test()

复制代码


如果想看运行效果,可以执行以下文件:


{'id': 1, 'title': '', 'watched': 1, 'category': '热点新闻', 'publish': '2020-02-13 00:45:52', 'tags': [], 'next': {}, 'pre': {}}{'uuid': '749ca9f6-4dfb-11ea-9c5b-acde48001122', 'error': False, 'message': ''}
复制代码


接下来,我们可以在本地模拟线上环境。下面的这个做法可以用于绝大部分函数,但如果遇到需要内网资源的函数,这个做法可能就比较麻烦。


后台的 Flaks 框架部分:


def test():    event = {'body': 'name=sdsadasdsadasd&remark=', 'headerParameters': {}, 'headers': {        'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9',        'accept-encoding': 'gzip, deflate', 'accept-language': 'zh-CN,zh;q=0.9', 'cache-control': 'no-cache',        'connection': 'keep-alive', 'content-length': '27', 'content-type': 'application/x-www-form-urlencoded',        'cookie': 'Hm_lvt_a0c900918361b31d762d9cf4dc81ee5b=1574491278,1575257377', 'endpoint-timeout': '15',        'host': 'blog.0duzhan.com', 'origin': 'http://blog.0duzhan.com', 'pragma': 'no-cache',        'proxy-connection': 'keep-alive', 'referer': 'http://blog.0duzhan.com/admin/tag/new/?url=%2Fadmin%2Ftag%2F',        'upgrade-insecure-requests': '1',        'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36',        'x-anonymous-consumer': 'true', 'x-api-requestid': '656622f3b008a0d406a376809b03b52c',        'x-b3-traceid': '656622f3b008a0d406a376809b03b52c', 'x-qualifier': '$LATEST'}, 'httpMethod': 'POST',             'path': '/admin/tag/new/', 'pathParameters': {}, 'queryString': {'url': '/admin/tag/'},             'queryStringParameters': {},             'requestContext': {'httpMethod': 'ANY', 'identity': {}, 'path': '/admin', 'serviceId': 'service-23ybmuq7',                                'sourceIp': '119.123.224.87', 'stage': 'release'}}    print(main_handler(event, None))

if __name__ == "__main__": test()
复制代码


index 执行结果:


{'body': 'name=sdsadasdsadasd&remark=', 'headerParameters': {}, 'headers': {'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'accept-encoding': 'gzip, deflate', 'accept-language': 'zh-CN,zh;q=0.9', 'cache-control': 'no-cache', 'connection': 'keep-alive', 'content-length': '27', 'content-type': 'application/x-www-form-urlencoded', 'cookie': 'Hm_lvt_a0c900918361b31d762d9cf4dc81ee5b=1574491278,1575257377', 'endpoint-timeout': '15', 'host': 'blog.0duzhan.com', 'origin': 'http://blog.0duzhan.com', 'pragma': 'no-cache', 'proxy-connection': 'keep-alive', 'referer': 'http://blog.0duzhan.com/admin/tag/new/?url=%2Fadmin%2Ftag%2F', 'upgrade-insecure-requests': '1', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/79.0.3945.130 Safari/537.36', 'x-anonymous-consumer': 'true', 'x-api-requestid': '656622f3b008a0d406a376809b03b52c', 'x-b3-traceid': '656622f3b008a0d406a376809b03b52c', 'x-qualifier': '$LATEST'}, 'httpMethod': 'POST', 'path': '/admin/tag/new/', 'pathParameters': {}, 'queryString': {'url': '/admin/tag/'}, 'queryStringParameters': {}, 'requestContext': {'httpMethod': 'ANY', 'identity': {}, 'path': '/admin', 'serviceId': 'service-23ybmuq7', 'sourceIp': '119.123.224.87', 'stage': 'release'}}{'isBase64Encoded': False, 'statusCode': 200, 'headers': {'Content-Type': 'text/html'}, 'body': '<!DOCTYPE html>\n<html lang="en">\n<head>\n    <meta charset="UTF-8">\n    <title>Title</title>\n    <script>\n        var url = window.location.href\n        url = url.split("admin")[0] + "admin"\n        String.prototype.endWith = function (s) {\n            var d = this.length - s.length;\n            return (d >= 0 && this.lastIndexOf(s) == d)\n        }\n        if (window.location.href != url) {\n            if (!window.location.href.endsWith("admin") || !window.location.href.endsWith("admin/"))\n                window.location = url\n        }\n\n        function doLogin() {\n            var xmlhttp = window.XMLHttpRequest ? (new XMLHttpRequest()) : (new ActiveXObject("Microsoft.XMLHTTP"))\n            xmlhttp.onreadystatechange = function () {\n                if (xmlhttp.readyState == 4 && xmlhttp.status == 200) {\n                    if (JSON.parse(xmlhttp.responseText)["Token"]) {\n                        document.cookie = "Token=" + JSON.parse(xmlhttp.responseText)["Token"];\n                        window.location = `http://${window.location.host}/admin`\n                    } else {\n                        alert(JSON.parse(xmlhttp.responseText)["message"])\n                    }\n                }\n            }\n            xmlhttp.open("POST", window.location.pathname, true);\n            xmlhttp.setRequestHeader("Content-type", "application/json");\n            xmlhttp.send(JSON.stringify({\n                "username": document.getElementById("username").value,\n                "password": document.getElementById("password").value,\n            }));\n        }\n    </script>\n</head>\n<body>\n\n<center><h1>Serverless Blog 后台管理</h1>\n    管理账号:<input type="text" id="username"><br>\n    管理密码:<input type="password" id="password"><br>\n    <input type="reset"><input type="submit" onclick="doLogin()"><br>\n</center>\n</body>\n</html>'}
复制代码


Flask 部署


将 Flask 部署到 Serverless 架构可以使用 @serverless/tencent-flask 实现。不过,为了让大家能够更深入了解传统框架是如何部署到 Serverless 架构,本文选择了自行’造轮子’实现。



通常情况下,我们使用 Flask 等框架是需要通过 web_server 来进入到下一个环节。但云函数更多的是一个函数,本不需要启动 web server,所以可以直接调用 wsgi_app 方法,其中 environ 就是对 event/context 等进行处理后的对象,start_response 可以认为是一种特殊的数据结构,例如 response 结构形态等。


如果不使用腾讯云 flask-component,我们可以这样实现:


# -*- coding: utf-8 -*-# Copyright 2016 Matt Martz# All Rights Reserved.##    Licensed under the Apache License, Version 2.0 (the "License"); you may#    not use this file except in compliance with the License. You may obtain#    a copy of the License at##         http://www.apache.org/licenses/LICENSE-2.0##    Unless required by applicable law or agreed to in writing, software#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the#    License for the specific language governing permissions and limitations#    under the License.
import sysimport json
try: from urllib import urlencodeexcept ImportError: from urllib.parse import urlencode
from flask import Flask
try: from cStringIO import StringIOexcept ImportError: try: from StringIO import StringIO except ImportError: from io import StringIO
from werkzeug.wrappers import BaseRequest
__version__ = '0.0.4'

def make_environ(event): environ = {}
for hdr_name, hdr_value in event['headers'].items(): hdr_name = hdr_name.replace('-', '_').upper() if hdr_name in ['CONTENT_TYPE', 'CONTENT_LENGTH']: environ[hdr_name] = hdr_value continue
http_hdr_name = 'HTTP_%s' % hdr_name environ[http_hdr_name] = hdr_value
apigateway_qs = event['queryStringParameters'] request_qs = event['queryString'] qs = apigateway_qs.copy() qs.update(request_qs)
body = '' if 'body' in event: body = event['body']
environ['REQUEST_METHOD'] = event['httpMethod'] environ['PATH_INFO'] = event['path'] environ['QUERY_STRING'] = urlencode(qs) if qs else '' environ['REMOTE_ADDR'] = 80 environ['HOST'] = event['headers']['host'] environ['SCRIPT_NAME'] = ''
environ['SERVER_PORT'] = 80 environ['SERVER_PROTOCOL'] = 'HTTP/1.1'
environ['CONTENT_LENGTH'] = str(len(body))
environ['wsgi.url_scheme'] = '' environ['wsgi.input'] = StringIO(body) environ['wsgi.version'] = (1, 0) environ['wsgi.errors'] = sys.stderr environ['wsgi.multithread'] = False environ['wsgi.run_once'] = True environ['wsgi.multiprocess'] = False
BaseRequest(environ)
return environ

class LambdaResponse(object): def __init__(self): self.status = None self.response_headers = None
def start_response(self, status, response_headers, exc_info=None): self.status = int(status[:3]) self.response_headers = dict(response_headers)

class FlaskLambda(Flask): def __call__(self, event, context): if 'httpMethod' not in event: print('httpMethod not in event') # In this "context" `event` is `environ` and # `context` is `start_response`, meaning the request didn't # occur via API Gateway and Lambda return super(FlaskLambda, self).__call__(event, context)
response = LambdaResponse() # print response.start_response
body = next(self.wsgi_app( make_environ(event), response.start_response ))
# return { # "isBase64Encoded": False, # "statusCode": 200, # "headers": {'Content-Type': 'text/html'}, # "body": body # }
return { 'statusCode': response.status, 'headers': response.response_headers, 'body': body }

复制代码


以上代码实现的是将 APIGW 请求转变成请求集成的形式,传送给 Flask 框架,用户可以通过 request.form 来获取 post 内容,通过 request.args 获取 get 内容等。


全局变量


全局变量可能包括用户账号、密码、云的密钥信息、数据库信息等。为了统一配置和修改,这里使用了自己写的全局变量组件:


# 函数们的整体配置信息Conf:  component: "serverless-global"  inputs:    region: ap-shanghai    runtime: Python3.6    handler: index.main_handler    include_common: ./common    blog_user: Dfounder    blog_email: service@anycodes.cn    blog_about_me: 这就是我的博客    blog_host: blog.0duzhan.com    website_title: Serverless Blog System    website_keywords: Serverless, Serverless Framework, Tencent Cloud, SCF    website_description: 一款基于腾讯云Serverless架构,并且采用Serverless Framework构建的Serverless博客系统。    website_bucket: serverless-blog-1256773370    mysql_host:     mysql_user: root    mysql_password:     mysql_port: 60510    mysql_db: serverless_blog_system    admin_user: mytest    admin_password: mytestabc    tencent_secret_id:     tencent_secret_key:     tencent_appid: 
复制代码


有些函数可以直接使用:


Blog_Web_addComment:  component: "@serverless/tencent-scf"  inputs:    name: Blog_Web_addComment    description: 添加评论    codeUri: ./cloudFunctions/addComment    handler: ${Conf.handler}    runtime: ${Conf.runtime}    region:  ${Conf.region}    include:      - ${Conf.include_common}    environment:      variables:        mysql_host: ${Conf.mysql_host}        mysql_port: ${Conf.mysql_port}        mysql_user: ${Conf.mysql_user}        mysql_password: ${Conf.mysql_password}        mysql_db: ${Conf.mysql_db}
复制代码


让项目初始化更容易


为了让项目更容易初始化,例如修改网站名字、描述、关键词、建立数据库等,我单独做了一个 init 文件:


# -*- coding: utf8 -*-import pymysqlimport shutilimport yamlimport os

def setEnv(): try: file = open("./serverless.yaml", 'r', encoding="utf-8") file_data = file.read() file.close()
data = yaml.load(file_data) for eveKey, eveValue in data['Conf']['inputs'].items(): os.environ[eveKey] = str(eveValue) return True except Exception as e: raise e

def initDb(): try: conn = pymysql.connect(host=os.environ.get('mysql_host'), user=os.environ.get('mysql_user'), password=os.environ.get('mysql_password'), port=int(os.environ.get('mysql_port')), charset='utf8') cursor = conn.cursor() sql = "CREATE DATABASE IF NOT EXISTS {db_name}".format(db_name=os.environ.get('mysql_db')) cursor.execute(sql) cursor.close() conn.close() return True except Exception as e: raise e

def initTable(): try: conn = pymysql.connect(host=os.environ.get('mysql_host'), user=os.environ.get('mysql_user'), password=os.environ.get('mysql_password'), port=int(os.environ.get('mysql_port')), db=os.environ.get('mysql_db'), charset='utf8', cursorclass=pymysql.cursors.DictCursor, autocommit=1) cursor = conn.cursor() createTags = "CREATE TABLE `tags` ( `tid` INT NOT NULL AUTO_INCREMENT , `name` VARCHAR(255) NOT NULL , `remark` TEXT NULL , PRIMARY KEY (`tid`), UNIQUE (`name`)) ENGINE = InnoDB;" createCategory = "CREATE TABLE `category` ( `cid` INT NOT NULL AUTO_INCREMENT , `name` VARCHAR(255) NOT NULL , `sorted` INT NOT NULL DEFAULT '1' , `remark` TEXT NULL , PRIMARY KEY (`cid`), UNIQUE (`name`)) ENGINE = InnoDB;" createComments = "CREATE TABLE `comments` ( `cid` INT NOT NULL AUTO_INCREMENT , `content` TEXT NOT NULL , `publish` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP , `user` VARCHAR(255) NOT NULL , `email` VARCHAR(255) NULL , `photo` INT NOT NULL DEFAULT '0' , `article` INT NOT NULL , `remark` TEXT NULL , `uni_mark` VARCHAR(255) NOT NULL , `is_show` INT NOT NULL DEFAULT '0' , PRIMARY KEY (`cid`), UNIQUE (`uni_mark`)) ENGINE = InnoDB;" createArticle = "CREATE TABLE `article` ( `aid` INT NOT NULL AUTO_INCREMENT , `title` VARCHAR(255) NOT NULL , `content` TEXT NOT NULL , `description` TEXT NOT NULL , `publish` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP , `watched` INT NOT NULL DEFAULT '0' , `category` INT NOT NULL , `remark` TEXT NULL , PRIMARY KEY (`aid`)) ENGINE = InnoDB;" createArticleTags = "CREATE TABLE `article_tags` ( `atid` INT NOT NULL AUTO_INCREMENT , `aid` INT NOT NULL , `tid` INT NOT NULL , PRIMARY KEY (`atid`)) ENGINE = InnoDB;" alertArticleTagsArticle = "ALTER TABLE `article_tags` ADD CONSTRAINT `article` FOREIGN KEY (`aid`) REFERENCES `article`(`aid`) ON DELETE CASCADE ON UPDATE CASCADE; " alertArticleTagsTags = "ALTER TABLE `article_tags` ADD CONSTRAINT `tags` FOREIGN KEY (`tid`) REFERENCES `tags`(`tid`) ON DELETE CASCADE ON UPDATE CASCADE;" alertArticleCategory = "ALTER TABLE `article` ADD CONSTRAINT `category` FOREIGN KEY (`category`) REFERENCES `category`(`cid`) ON DELETE CASCADE ON UPDATE CASCADE;" alertCommentsArticle = "ALTER TABLE `comments` ADD CONSTRAINT `article_comments` FOREIGN KEY (`article`) REFERENCES `article`(`aid`) ON DELETE CASCADE ON UPDATE CASCADE;" cursor.execute(createTags) cursor.execute(createCategory) cursor.execute(createComments) cursor.execute(createArticle) cursor.execute(createArticleTags) cursor.execute(alertArticleTagsArticle) cursor.execute(alertArticleTagsTags) cursor.execute(alertArticleCategory) cursor.execute(alertCommentsArticle) cursor.close() conn.close() return True except Exception as e: raise e

def initHTML(): try: tempPath = "website" tempDist = os.path.join(tempPath, "dist") if os.path.exists(tempDist): shutil.rmtree(tempDist) tempFileList = [] for eve in os.walk(tempPath): if eve[2]: for eveFile in eve[2]: tempFileList.append(os.path.join(eve[0], eveFile)) os.mkdir(tempDist) for eve in tempFileList: temp = os.path.split(eve.replace(tempPath, tempDist)) if not os.path.exists(temp[0]): os.makedirs(temp[0]) if eve.endswith(".html") or eve.endswith(".htm"): with open(eve) as readData: with open(eve.replace(tempPath, tempDist), "w") as writeData: writeData.write(readData.read(). replace('{{ user }}', os.environ.get('blog_user')). replace('{{ email }}', os.environ.get('blog_email')). replace('{{ title }}', os.environ.get('website_title')). replace('{{ keywords }}', os.environ.get('website_keywords')). replace('{{ about_me }}', os.environ.get('blog_about_me')). replace('{{ host }}', os.environ.get('blog_host')). replace('{{ description }}', os.environ.get('website_description'))) else: shutil.copy(eve, eve.replace(tempPath, tempDist)) return True except Exception as e: raise e

if __name__ == "__main__": print("获取Yaml数据: ", setEnv()) print("建立数据库:", initDb()) print("建立数据库:", initTable()) print("初始化HTML:", initHTML())

复制代码


公共组件的开发


项目中会有很多公共组件,例如数据库,所以我把数据库的代码统一放在了一起:common/mysqlCommon.py:


# -*- coding: utf8 -*-
import osimport reimport pymysqlimport hashlibfrom random import choice

class mysqlCommon: def __init__(self): self.getConnection({ "host": os.environ.get('mysql_host'), "user": os.environ.get('mysql_user'), "port": int(os.environ.get('mysql_port')), "db": os.environ.get('mysql_db'), "password": os.environ.get('mysql_password') })
def getDefaultPic(self): return choice([ 'http://t8.baidu.com/it/u=1484500186,1503043093&fm=79&app=86&f=JPEG?w=1280&h=853', 'http://t8.baidu.com/it/u=2247852322,986532796&fm=79&app=86&f=JPEG?w=1280&h=853', 'http://t7.baidu.com/it/u=3204887199,3790688592&fm=79&app=86&f=JPEG?w=4610&h=2968', 'http://t9.baidu.com/it/u=3363001160,1163944807&fm=79&app=86&f=JPEG?w=1280&h=830', 'http://t9.baidu.com/it/u=583874135,70653437&fm=79&app=86&f=JPEG?w=3607&h=2408', 'http://t9.baidu.com/it/u=583874135,70653437&fm=79&app=86&f=JPEG?w=3607&h=2408', 'http://t9.baidu.com/it/u=1307125826,3433407105&fm=79&app=86&f=JPEG?w=5760&h=3240', 'http://t9.baidu.com/it/u=2268908537,2815455140&fm=79&app=86&f=JPEG?w=1280&h=719', 'http://t7.baidu.com/it/u=1179872664,290201490&fm=79&app=86&f=JPEG?w=1280&h=854', 'http://t9.baidu.com/it/u=3949188917,63856583&fm=79&app=86&f=JPEG?w=1280&h=875', 'http://t9.baidu.com/it/u=2266751744,4253267866&fm=79&app=86&f=JPEG?w=1280&h=854', 'http://t8.baidu.com/it/u=4100756023,1345858297&fm=79&app=86&f=JPEG?w=1280&h=854', 'http://t7.baidu.com/it/u=1355385882,1155324943&fm=79&app=86&f=JPEG?w=1280&h=854', 'http://t9.baidu.com/it/u=2292037961,3689236171&fm=79&app=86&f=JPEG?w=1280&h=854', 'http://t9.baidu.com/it/u=4241966675,2405819829&fm=79&app=86&f=JPEG?w=1280&h=854', 'http://t8.baidu.com/it/u=2857883419,1187496708&fm=79&app=86&f=JPEG?w=1280&h=763', 'http://t8.baidu.com/it/u=198337120,441348595&fm=79&app=86&f=JPEG?w=1280&h=732' ])
def getConnection(self, conf): self.connection = pymysql.connect(host=conf['host'], user=conf['user'], password=conf['password'], port=int(conf['port']), db=conf['db'], charset='utf8', cursorclass=pymysql.cursors.DictCursor, autocommit=1)
def doAction(self, stmt, data): try: self.connection.ping(reconnect=True) cursor = self.connection.cursor() cursor.execute(stmt, data) result = cursor cursor.close() return result except Exception as e: print(e) try: cursor.close() except: pass return False
def getCategoryList(self): search_stmt = ( "SELECT * FROM `category` ORDER BY `sorted`" ) result = self.doAction(search_stmt, ()) if result == False: return False return [{"id": eveCategory['cid'], "name": eveCategory['name']} for eveCategory in result.fetchall()]
def getArticleList(self, category, tag, page=1): if category: search_stmt = ( "SELECT article.*,category.name FROM `article` LEFT JOIN `category` ON article.category=category.cid WHERE article.category=%s ORDER BY -article.aid LIMIT %s,%s;" ) count_stmt = ( "SELECT COUNT(*) FROM `article` LEFT JOIN `category` ON article.category=category.cid WHERE article.category=%s;" ) data = (category, 10 * (int(page) - 1), 10 * int(page)) count_data = (category,) elif tag: search_stmt = ( "SELECT article.* FROM `article` LEFT JOIN `article_tags` ON article.aid=article_tags.aid WHERE article_tags.tid=%s ORDER BY -article.aid LIMIT %s,%s;" ) count_stmt = ( "SELECT COUNT(*) FROM `article`LEFT JOIN `article_tags` ON article.aid=article_tags.aid WHERE article_tags.tid=%s;" ) data = (tag, 10 * (int(page) - 1), 10 * int(page)) count_data = (tag,) else: search_stmt = ( "SELECT article.*,category.name FROM `article` LEFT JOIN `category` ON article.category=category.cid ORDER BY -article.aid LIMIT %s,%s;" ) count_stmt = ( "SELECT COUNT(*) FROM `article` LEFT JOIN `category` ON article.category=category.cid; " ) data = (10 * (int(page) - 1), 10 * int(page)) count_data = () result = self.doAction(search_stmt, data) if result == False: return False
return {"data": [{"id": eveArticle['aid'], "title": eveArticle['title'], "description": eveArticle['description'], "watched": eveArticle['watched'], "category": eveArticle['category'], "publish": str(eveArticle['publish']), "picture": self.getPicture(eveArticle['content'])} for eveArticle in result.fetchall()], "count": self.doAction(count_stmt, count_data).fetchone()["COUNT(*)"]}
def getHotArticleList(self): search_stmt = ( "SELECT article.*,category.name FROM `article` LEFT JOIN `category` ON article.category=category.cid ORDER BY article.watched LIMIT 0,5" ) result = self.doAction(search_stmt, ()) if result == False: return False return [{"id": eveArticle['aid'], "title": eveArticle['title'], "description": eveArticle['description'], "watched": eveArticle['watched'], "category": eveArticle['category'], "publish": str(eveArticle['publish']), "picture": self.getPicture(eveArticle['content'])} for eveArticle in result.fetchall()]
def getTagsArticle(self, aid): search_stmt = ( "SELECT tags.name, tags.tid FROM `article_tags` LEFT JOIN `tags` ON article_tags.tid=tags.tid WHERE article_tags.aid=%s;" ) result = self.doAction(search_stmt, (aid,)) if result == False: return False return [{"id": eveTag["tid"], "name": eveTag["name"]} for eveTag in result.fetchall()]
def getTagsList(self): search_stmt = ( "SELECT * FROM tags ORDER BY RAND() LIMIT 20; " ) result = self.doAction(search_stmt, ()) if result == False: return False return [{"id": eveTag['tid'], "name": eveTag['name']} for eveTag in result.fetchall()]
def getArticleContent(self, aid): search_stmt = ( "SELECT article.*, category.name FROM `category` LEFT JOIN `article` ON category.cid=article.category WHERE article.aid=%s;" ) result = self.doAction(search_stmt, (aid)) if result == False: return False article = result.fetchone() return { "id": article["aid"], "title": article["title"], "content": article["content"], "description": article["description"], "watched": article["watched"], "category": article["name"], "publish": str(article["publish"]), "tags": self.getTagsArticle(article["aid"]), "next": self.getOtherArticle(aid, "next"), "pre": self.getOtherArticle(aid, "pre") } if article else {}
def getOtherArticle(self, aid, articleType): search_stmt = ( "SELECT * FROM `article` WHERE aid=(select max(aid) from `article` where aid>%s)" ) if articleType == "next" else ( "SELECT * FROM `article` WHERE aid=(select max(aid) from `article` where aid<%s)" ) result = self.doAction(search_stmt, (aid)) if result == False: return False article = result.fetchone() return { "id": article["aid"], "title": article["title"] } if article else {}
def getComments(self, aid): search_stmt = ( "SELECT * FROM `comments` WHERE article=%s AND is_show=1 ORDER BY -cid LIMIT 100;" ) result = self.doAction(search_stmt, (aid)) if result == False: return False return [{"content": eveComment['content'], "publish": str(eveComment['publish']), "user": eveComment['user'], "remark": eveComment['remark']} for eveComment in result.fetchall()]
def addComment(self, content, user, email, aid): insert_stmt = ( "INSERT INTO `comments` (`cid`, `content`, `publish`, `user`, `email`, `article`, `uni_mark`) " "VALUES (NULL, %s, CURRENT_TIMESTAMP, %s, %s, %s, %s)" ) result = self.doAction(insert_stmt, (content, user, email, aid, hashlib.md5( ("%s----%s----%s----%s" % (str(content), str(user), str(email), str(aid))).encode("utf-8")).hexdigest())) return False if result == False else True
def updateArticleWatched(self, wid): update_stmt = ( "UPDATE `article` SET `watched`=`watched`+1 WHERE `aid` = %s" ) return False if self.doAction(update_stmt, (wid)) == False else True
def getPicture(self, content): resultList =[eve[1] for eve in re.findall('<img(.*?)src="(.*?)"(.*?)>', content)] return resultList[0] if resultList else self.getDefaultPic()

def getTag(self, tag): search_stmt = ( "SELECT * FROM `tags` WHERE name=%s;" ) result = self.doAction(search_stmt, (tag,)) return False if not result or result.rowcount == 0 else result.fetchone()['tid']
def addTag(self, tag): insert_stmt = ( "INSERT INTO `tags` (`tid`, `name`, `remark`) " "VALUES (NULL, %s, NULL)" ) result = self.doAction(insert_stmt, (tag)) return False if result == False else result.lastrowid
def addArticleTag(self, article, tag): insert_stmt = ( "INSERT INTO `article_tags` (`atid`, `aid`, `tid`) " "VALUES (NULL, %s, %s)" ) result = self.doAction(insert_stmt, (article, tag)) return False if result == False else True
复制代码


这里实现的是数据库增删改查的全部功能(admin 除外),在使用的时候分为本地和线上:



try: import returnCommon from mysqlCommon import mysqlCommonexcept: import common.testCommon
common.testCommon.setEnv()
import common.returnCommon as returnCommon from common.mysqlCommon import mysqlCommon
mysql = mysqlCommon()
复制代码


利用 Python 异常,如果导入没找到,那就说明是本地测试,如果from mysqlCommon import mysqlCommon找到了,那就说明是线上环境。除了数据库的公共组件,还会有returnCommon等公共文件。 这些文件在使用的时候也需要打包进入,可以在 yaml 中增加 include,例如:


Blog_Web_addComment:  component: "@serverless/tencent-scf"  inputs:    name: Blog_Web_addComment    description: 添加评论    codeUri: ./cloudFunctions/addComment    handler: ${Conf.handler}    runtime: ${Conf.runtime}    region:  ${Conf.region}    include:      - ${Conf.include_common}
复制代码


功能展示


前台功能


  • 列表页



  • 内容页



后台功能


  • 登录功能



  • 列表页



  • 表单页



项目部署


  • 配置serverless.yaml


# 函数们的整体配置信息Conf:  component: "serverless-global"  inputs:    region: ap-shanghai    runtime: Python3.6    handler: index.main_handler    include_common: ./common    blog_user: Dfounder    blog_email: service@anycodes.cn    website_title: Serverless Blog System    website_keywords: Serverless, Serverless Framework, Tencent Cloud, SCF    website_description: 一款基于腾讯云Serverless架构,并且采用Serverless Framework构建的Serverless博客系统。    website_bucket: serverless-blog-1256773370    mysql_host:     mysql_password:     mysql_port:     mysql_db:     admin_user: mytest    admin_password: mytest
复制代码


除此之外,还需要关注域名问题:


# 网站CosBucket:  component: '@serverless/tencent-website'  inputs:    code:      root: website/dist      src: ./      index: list.html    region:  ${Conf.region}    bucketName: ${Conf.website_bucket}    hosts:      - host: 0duzhan.com        https:          certId: awPsOIHY          forceSwitch: -1      - host: www.0duzhan.com        https:          certId: awPsOIHY          forceSwitch: -1
env: apiUrl: ${APIService.subDomain}
复制代码


API 网关内容:


# 创建API网关ServiceAPIService:  component: "@serverless/tencent-apigateway"  inputs:    region: ${Conf.region}    customDomain:      - domain: api.0duzhan.com        isDefaultMapping: 'FALSE'        pathMappingSet:          - path: /            environment: release        protocols:          - http    protocols:      - http      - https    ........
复制代码


这里可以将这两部分域名可以修改成自己的。


  • 执行init.py:


这里需要注意的是,该示例是在 MacOS 下开发的,所以 init.py 应该可在 mac/linux 运行,windows 用户需要做适当修改。


另外,还需要安装一个依赖:pyyaml:


获取Yaml数据:  True建立数据库: True建立数据库: True初始化HTML: True
复制代码


  • 部署资源,执行serverless --debug


(venv) ServerlessBlog:ServerlessBlog dfounderliu$ sls --debug
DEBUG ─ Resolving the template's static variables. DEBUG ─ Collecting components from the template. DEBUG ─ Downloading any NPM components found in the template. DEBUG ─ Analyzing the template's components dependencies. DEBUG ─ Creating the template's components graph. DEBUG ─ Syncing template state. DEBUG ─ Executing the template's components graph. DEBUG ─ Preparing website Tencent COS bucket serverless-blog-1256773370. DEBUG ─ Starting API-Gateway deployment with name APIService in the ap-shanghai region DEBUG ─ Using last time deploy service id service-23ybmuq7 DEBUG ─ Updating service with serviceId service-23ybmuq7. DEBUG ─ Bucket "serverless-blog-1256773370" in the "ap-shanghai" region alrea ……………… - path: /web/article/watched/update method: POST apiId: api-gnvnrbyk - path: /web/sentence/get method: POST apiId: api-msvadsau - path: /web/article/list/hot/get method: POST apiId: api-kfkrjhim - path: /web/tags/list/get method: POST apiId: api-avydagem - path: /admin method: ANY apiId: api-4tnz5tc4
176s › APIService › done

复制代码


项目总结


目前,传统博客已经有很多了,基于 PHP 的 zblog、wp 等开源项目都可以帮助我们快速搭建一个博客系统,但是基于 Serverless 架构的博客系统是比较少见的。


本文通过原生 Serverless 项目开发和 Flask 框架部署实现了一个基于 Python 语言的博客系统,在这个系统实现了文章发布、自动撰写文章的关键词和摘要,还可以进行留言评论的管理。不过,这个博客系统仅作为工程实践使用,所以仍然会存在 BUG 或者设计不合理的地方,如果要是实际应用,还需接着完善。


2020 年 7 月 17 日 11:192543

评论

发布
暂无评论
发现更多内容

架构师训练营 - 第一周作业二

teslə

食堂就餐卡系统设计

王新涵

week01小结

Geek_196d0f

食堂就餐卡系统

chenzt

架构师训练营0期第一周学习总结

王新涵

训练营第一周作业

Mr冰凉

UML示例

Geek_196d0f

【第一周】命题作业——食堂就餐卡系统设计

三尾鱼

学习 极客大学架构师训练营

第一周学习总结

15359861984

架构师训练营第一周作业-王韬

W_T

极客大学架构师训练营 作业

食堂就餐卡系统

SuperLab

食堂就餐卡系统设计文档

15359861984

训练营第一章笔记

Mr冰凉

食堂就餐卡系统设计

林昱榕

极客大学架构师训练营 食堂就餐卡系统设计

架构师训练营- 第一周 - 学习总结

stardust20

架构师训练营-week1-学习总结

晓-Michelle

极客大学架构师训练营

2020-06-06-食堂就餐卡系统设计

路易斯李李李

假如黑夜来临

zhoo299

随笔 随笔杂谈

极客时间架构师训练营 - week1 - 作业1

jjn0703

极客大学架构师训练营

【总结】架构师如何做架构

魔曦

架构 架构师 极客大学架构师训练营

食堂就餐卡系统设计

娄江国

极客大学架构师训练营

架构师训练营0期第1周-总结+作业

林毋梦

极客大学架构师训练营

架构师 - 入学篇

X﹏X

极客大学架构师训练营

架构师训练营第一周总结

allen

架构师训练营-week1-作业-食堂就餐卡系统架构设计

晓-Michelle

极客大学架构师训练营

第一周作业 食堂就餐卡系统uml图

战峰

食堂就餐卡系统设计

胡江涛

系统设计 极客大学架构师训练营 就餐卡

第一周总结

chenzt

架构师训练营第一次作业

曾祥斌

食堂就餐系统设计

Glowry

极客大学架构师训练营

架构师训练营 第一周 食堂就餐卡系统设计

且听且吟

极客大学架构师训练营

利用原生Serverless与Flask框架开发一个博客系统-InfoQ