Serverless 白皮书中曾描述过 Serverless 的一些缺点,例如难以调试、冷启动严重等等。其中难以调试是表现在多个方面的,有一个方面是日志输出。
当我们把 Serverless 架构应用于实际项目,就会发现调试成为了效率的重要影响因素。以日志输出为例,某个函数被触发之后未得到预期结果,大家第一想法就是查看日志,但这时输出的日志可能并未是我们想要的,而且云厂商输出日志的延时也非常高。
日志输出现状
以腾讯云云函数为例,我们可以看一下其日志输出情况:
通过控制台或者是云 API 的 Invoke 接口触发云函数:
通过这个测试
功能,可以很快获取到函数的结果,并查看日志信息。
通过 API 网关、COS 等触发云函数,此处以 API 网关为例:
通过网关触发一个函数:
通过函数日志查看何时会刷出这个日志:
这个过程大概有 11S,通过代码来进行更加详细的测试:
import json,time
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.scf.v20180416 import scf_client, models
try:
cred = credential.Credential("", "")
httpProfile = HttpProfile()
httpProfile.endpoint = "scf.tencentcloudapi.com"
clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
client = scf_client.ScfClient(cred, "ap-guangzhou", clientProfile)
req = models.InvokeRequest()
params = '{"FunctionName":"test"}'
req.from_json_string(params)
resp = client.Invoke(req)
functionRequestId = json.loads(resp.to_json_string())["Result"][ "FunctionRequestId"]
print(time.time(), functionRequestId)
while True:
time.sleep(0.2)
req = models.GetFunctionLogsRequest()
params = '{"FunctionName":"test"}'
req.from_json_string(params)
resp = client.GetFunctionLogs(req)
if functionRequestId in str(resp.to_json_string()):
break
print(time.time())
except TencentCloudSDKException as err:
print(err)
输出结果:
1584108001.141546 ee7243dd-6532-11ea-8bce-5254000c8aa4
1584108005.2496068
这次输出结果是 4S,再做一个多次调用的时间对比图:
import json
import time
import numpy
import matplotlib.pyplot as plt
from tencentcloud.common import credential
from tencentcloud.common.profile.client_profile import ClientProfile
from tencentcloud.common.profile.http_profile import HttpProfile
from tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKException
from tencentcloud.scf.v20180416 import scf_client, models
try:
cred = credential.Credential("", "")
httpProfile = HttpProfile()
httpProfile.endpoint = "scf.tencentcloudapi.com"
clientProfile = ClientProfile()
clientProfile.httpProfile = httpProfile
client = scf_client.ScfClient(cred, "ap-guangzhou", clientProfile)
timeList = []
for i in range(0, 100):
req = models.InvokeRequest()
params = '{"FunctionName":"test"}'
req.from_json_string(params)
resp = client.Invoke(req)
functionRequestId = json.loads(resp.to_json_string())["Result"]["FunctionRequestId"]
startTime = int(time.time())
while True:
time.sleep(0.2)
req = models.GetFunctionLogsRequest()
params = '{"FunctionName":"test"}'
req.from_json_string(params)
resp = client.GetFunctionLogs(req)
if functionRequestId in str(resp.to_json_string()):
break
endTime = int(time.time())
timeList.append(endTime - startTime)
print("最大时间", int(max(timeList)))
print("最小时间", int(min(timeList)))
print("平均时间", int(numpy.mean(timeList)))
plt.figure()
plt.subplot(2, 1, 1)
x_data = range(0, len(timeList))
plt.plot(x_data, timeList)
plt.subplot(2, 1, 2)
plt.hist(timeList, bins=20)
plt.show()
except TencentCloudSDKException as err:
print(err)
这是比较差的一段代码,耗时很久,可以考虑加入队列,一方面多进程在队列面加入执行的 RequestId,一方面消费 RequestId,进入到获取 Logs 的对象中,速度可以大大提升。但是无论如何,运行结果如下:
最大时间 31
最小时间 0
平均时间 17
通过这个结果,我们发现日志输出有两个问题:
时间频率不固定,通过数据可以看到,快的话可能几秒就出结果,慢的话可能十几秒,二十几秒,甚至三十几秒;
日志普遍输出速度很慢,会严重影响定位问题;
就目前的腾讯云 Serverless 架构而言,如果要在本地开发一个项目,并在本地进行了初步的调试,就算一切正常,也并不能保证在线上完全可用,尤其在复杂的触发器环境下以及复杂的对象复用、内网资源使用的前提下,本地调试的难度非常大,很难完整模拟出线上的环境。
以 API 网关触发器为例,当本地写完代码,调试完成部署线上,通过 API 网关触发一次,发现函数代码不能正常运行,这个时候的第一想法是什么?查看日志,看一下打印的日志有哪些问题,是不是通过日志可以判断出问题。很遗憾的告诉你,你可能要等几秒钟,十几秒钟,甚至二十几秒,三十秒。
自建日志输出功能
通过刚才的分析,我们可以知道,在线上触发函数的时候,日志入库的速度非常缓慢,而且极其不稳定,一定条件下会严重影响开发进度以及问题定位的进度。为了解决这个问题,我们可以通过 Serverless 架构,封装一套实时日志功能:
在这个操作过程中,主要使用一个 API 网关作为 Websocket 与客户端建立链接,三个函数(注册函数,上报函数,清理函数)与 API 搭配使用,存储桶作为部分资源的临时存储。
整个流程大概可以描述为:
客户端决定开启实时日志,并将要监控的函数信息(包括地域,命名空间,函数名)作为参数,与 API 网关建立 Websocket 链接;
API 网关建立 Websocket 链接的时候,会触发注册函数,此时注册函数会将 RequestId(ConnectionId)与函数信息以 Key-Value 存储到对象存储中;
根据函数信息找到对应的函数,将回推地址以及 ConnectionId 写到函数环境变量中;
此时函数只要被触发,就会先读取环境变量,根据环境变量决定是否将函数日志上报到指定地址(即带着 connectionId 发送到回推地址);
上报函数收到业务函数传递过来的数据,将数据发送到指定的 ConnectionId 的客户端,实现实时日志的输出;
当客户端断开连接之后,会触发清理函数;
清理函数会清理掉业务函数中的回推地址和 ConnectionId 等信息,清理之后,业务函数再被触发,则会因为读取不到该参数,而不会上报数据;
将根据 RequestId(ConnectionId)从对象存储删除,至此完成一次日志实时输出功能;
由于腾讯云的 API 网关限制,所以该功能每次最长只能执行 900s,900s 之后需要重新执行该程序。
API 网关涉及到的三个函数:
注册函数:主要用来完成数据存储和函数信息修改等操作,是用户建立链接时触发的函数;
# -*- coding: utf8 -*-
import json, os
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client
from tencentcloud.common import credential
from tencentcloud.scf.v20180416 import scf_client, models
def setFunction2Bucket(name, namespace, secretId, secretKey, token, connid):
region = os.environ.get("bucket_region")
config = CosConfig(Region=region, SecretId=secretId, SecretKey=secretKey, Token=token)
client = CosS3Client(config)
response = client.put_object(
Bucket=os.environ.get("bucket"),
Body=json.dumps({
"region": region,
"namespace": namespace,
"function": name
}).encode("utf-8"),
Key=connid,
EnableMD5=False
)
return response
def setFunctionConfigure(name, namespace, region, secreetId, secretKey, token, connid, transurl):
try:
environmentVariablesList = [
{
"Key": "real_time_log_id",
"Value": connid
},
{
"Key": "real_time_log_url",
"Value": transurl
},
{
"Key": "real_time_log",
"Value": "open"
}
]
cred = credential.Credential(secreetId, secretKey, token=token)
client = scf_client.ScfClient(cred, region)
req = models.GetFunctionRequest()
req.from_json_string(json.dumps({"FunctionName": name, "Namespace": namespace, "ShowCode": "FALSE"}))
resp = client.GetFunction(req)
environmentVariables = json.loads(resp.to_json_string())["Environment"]["Variables"]
for eveVariables in environmentVariables:
if eveVariables["Key"] == "real_time_log_id" or eveVariables["Key"] == "real_time_log_url" or eveVariables["Key"] == "real_time_log":
continue
environmentVariablesList.append(eveVariables)
req = models.UpdateFunctionConfigurationRequest()
req.from_json_string(json.dumps({"FunctionName": name,
"Environment": {
"Variables": environmentVariablesList
},
"Namespace": namespace}))
client.UpdateFunctionConfiguration(req)
setFunction2Bucket(name, namespace, secreetId, secretKey, token, connid)
return True
except Exception as e:
print(e)
return False
def main_handler(event, context):
print("event is: ", event)
connectionID = event['websocket']['secConnectionID']
if not setFunctionConfigure(
event['queryString']['name'],
event['queryString']['namespace'],
event['queryString']['region'],
os.environ.get("TENCENTCLOUD_SECRETID"),
os.environ.get("TENCENTCLOUD_SECRETKEY"),
os.environ.get("TENCENTCLOUD_SESSIONTOKEN"),
connectionID,
os.environ.get("url")
):
return False
if 'requestContext' not in event.keys():
return {"errNo": 101, "errMsg": "not found request context"}
if 'websocket' not in event.keys():
return {"errNo": 102, "errMsg": "not found web socket"}
retmsg = {}
retmsg['errNo'] = 0
retmsg['errMsg'] = "ok"
retmsg['websocket'] = {
"action": "connecting",
"secConnectionID": connectionID
}
if "secWebSocketProtocol" in event['websocket'].keys():
retmsg['websocket']['secWebSocketProtocol'] = event['websocket']['secWebSocketProtocol']
if "secWebSocketExtensions" in event['websocket'].keys():
ext = event['websocket']['secWebSocketExtensions']
retext = []
exts = ext.split(";")
print(exts)
for e in exts:
e = e.strip(" ")
if e == "permessage-deflate":
pass
if e == "client_max_window_bits":
pass
retmsg['websocket']['secWebSocketExtensions'] = ";".join(retext)
print("connecting: connection id:%s" % event['websocket']['secConnectionID'])
return retmsg
上报函数:用户开启实时日志成功之后,业务函数上报数据。
# -*- coding: utf8 -*-
import os
import json
import requests
def main_handler(event, context):
try:
print("event is: ", event)
body = json.loads(event["body"])
url = os.environ.get("url")
retmsg = {}
retmsg['websocket'] = {}
retmsg['websocket']['action'] = "data send"
retmsg['websocket']['secConnectionID'] = body["coid"]
retmsg['websocket']['dataType'] = 'text'
retmsg['websocket']['data'] = body["data"]
print(retmsg)
requests.post(url, json=retmsg)
return True
except Exception as e:
return False
清理函数:客户端关闭链接时触发的函数,部分操作是注册函数的逆操作。
# -*- coding: utf8 -*-
import json, os
import requests
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client
from tencentcloud.common import credential
from tencentcloud.scf.v20180416 import scf_client, models
def setFunctionConfigure(name, namespace, region, secreetId, secretKey, token):
try:
environmentVariablesList = [{
"Key": "real_time_log",
"Value": "close"
}]
cred = credential.Credential(secreetId, secretKey, token=token)
client = scf_client.ScfClient(cred, region)
req = models.GetFunctionRequest()
params = json.dumps({"FunctionName": name, "Namespace": namespace, "ShowCode": "FALSE"})
req.from_json_string(params)
resp = client.GetFunction(req)
environmentVariables = json.loads(resp.to_json_string())["Environment"]["Variables"]
for eveVariables in environmentVariables:
if eveVariables["Key"] == "real_time_log_id" or eveVariables["Key"] == "real_time_log_url" or eveVariables["Key"] == "real_time_log":
continue
environmentVariablesList.append(eveVariables)
print(environmentVariablesList)
req = models.UpdateFunctionConfigurationRequest()
params = json.dumps({"FunctionName": name,
"Environment": {
"Variables": environmentVariablesList
},
"Namespace": namespace})
req.from_json_string(params)
resp = client.UpdateFunctionConfiguration(req)
print(resp.to_json_string())
return True
except Exception as e:
print(e)
return False
def main_handler(event, context):
print("event is: ", event)
connectionID = event['websocket']['secConnectionID']
region = os.environ.get("bucket_region")
secreetId = os.environ.get("TENCENTCLOUD_SECRETID")
secretKey = os.environ.get("TENCENTCLOUD_SECRETKEY")
token = os.environ.get("TENCENTCLOUD_SESSIONTOKEN")
config = CosConfig(Region=region, SecretId=secreetId, SecretKey=secretKey, Token=token)
client = CosS3Client(config)
response = client.get_object(
Bucket=os.environ.get("bucket"),
Key=connectionID,
)
response['Body'].get_stream_to_file('/tmp/connid.json')
with open('/tmp/connid.json') as f:
data = json.loads(f.read())
if not setFunctionConfigure(
data["function"],
data["namespace"],
data["region"],
secreetId,
secretKey,
token,
):
return False
retmsg = {}
retmsg['websocket'] = {}
retmsg['websocket']['action'] = "closing"
retmsg['websocket']['secConnectionID'] = connectionID
requests.post(os.environ.get("url"), json=retmsg)
return retmsg
业务函数上报数据的逻辑,实际上就是修改常见组件的日志方法,以 Python 为例,例如重写print()
方法以及logging
组件:
重写print()
:
# -*- coding: utf8 -*-
import os
import sys
import json
import urllib.parse
import urllib.request
def print(*args):
url = os.environ.get("real_time_log_url")
cid = os.environ.get("real_time_log_id")
if url and cid and os.environ.get("real_time_log_id", None):
try:
retmsg = {
"coid": cid,
"data": " ".join([str(eveObject) for eveObject in args])
}
urllib.request.urlopen(
urllib.request.Request(
url=url,
data=json.dumps(retmsg).encode("utf-8")
)
)
except Exception as e:
sys.stdout.write("Debug Error:" + str(e))
sys.stdout.write("aaa"+ " ".join([str(eveObject) for eveObject in args]) + "\n")
对logging
进行额外的处理,将文件中的log
/info
…等接口增加上报逻辑,例如:
def warning(msg, *args, **kwargs):
"""
Log a message with severity 'WARNING' on the root logger. If the logger has
no handlers, call basicConfig() to add a console handler with a pre-defined
format.
"""
realTimeLogs("WARNING %s %s"%(str(msg), " ".join([str(eveObject) for eveObject in args])))
if len(root.handlers) == 0:
basicConfig()
root.warning(msg, *args, **kwargs)
上报逻辑:
def realTimeLogs(data):
url = os.environ.get("real_time_log_url")
cid = os.environ.get("real_time_log_id")
if url and cid and os.environ.get("real_time_log_id", None):
try:
retmsg = {
"coid": cid,
"data": data
}
urllib.request.urlopen(
urllib.request.Request(
url=url,
data=json.dumps(retmsg).encode("utf-8")
)
)
except Exception as e:
sys.stdout.write("Debug Error:" + str(e))
封装成工具
将重写部分封装成客户端工具
将线上函数部分封装成 Component
封装成工具后的整体使用流程:
组件的安装与配置
安装
scflog
:
npm install scflog
部署实时日志组件,新建项目,并且建立
serverless.yaml
,内容:
PythonLogs:
component: '@gosls/tencent-pythonlogs'
inputs:
region: ap-guangzhou
通过sls --debug
部署:
DEBUG ─ Setting tags for function PythonRealTimeLogs_Cleanup
DEBUG ─ Creating trigger for function PythonRealTimeLogs_Cleanup
DEBUG ─ Deployed function PythonRealTimeLogs_Cleanup successful
PythonLogs:
websocket: ws://service-laabz6zm-1256773370.gz.apigw.tencentcs.com/test/python_real_time_logs
26s › PythonLogs › done
配置组件:
scflog set -w ws://service-laabz6zm-1256773370.gz.apigw.tencentcs.com/test/python_real_time_logs
配置成功输出:
DFOUNDERLIU-MB0:~ dfounderliu$ scflog set -w ws://service-laabz6zm-1256773370.gz.apigw.tencentcs.com/test/python_real_time_logs
设置成功
websocket: ws://service-laabz6zm-1256773370.gz.apigw.tencentcs.com/test/python_real_time_logs
region: ap-guangzhou
namespace: default
函数的初始化与部署
在项目中使用该组件的方法很简单。
创建一个文件夹,并进入
mkdir scflogs && cd scflogs
初始化项目
scflog init -l python
创建
index.py
文件以及serverless.yaml
文件:
vim index.py
内容是:
from logs import *
import time
import logging
def main_handler(event, context):
print("event is: ", event)
time.sleep(1)
logging.debug("this is debug_msg")
time.sleep(1)
logging.info("this is info_msg")
time.sleep(1)
logging.warning("this is warning_msg")
time.sleep(1)
logging.error("this is error_msg")
time.sleep(1)
logging.critical("this is critical_msg")
time.sleep(1)
print("context is: ", event)
return "hello world"
vim serverless.yaml
内容是:
Hello_World:
component: "@serverless/tencent-scf"
inputs:
name: Hello_World
codeUri: ./
handler: index.main_handler
runtime: Python3.6
region: ap-guangzhou
description: My Serverless Function
memorySize: 64
timeout: 20
exclude:
- .gitignore
- .git/**
- node_modules/**
- .serverless
- .env
events:
- apigw:
name: serverless
parameters:
protocols:
- http
serviceName: serverless
description: the serverless service
environment: release
endpoints:
- path: /test
method: ANY
通过sls --debug
部署:
DEBUG ─ Deployed function Hello_World successful
Hello_World:
Name: Hello_World
Runtime: Python3.6
Handler: index.main_handler
MemorySize: 64
Timeout: 20
Region: ap-guangzhou
Namespace: default
Description: My Serverless Function
APIGateway:
- serverless - http://service-89bjzrye-1256773370.gz.apigw.tencentcs.com/release
30s › Hello_World › done
实时日志功能的测试
配置 APIGW 的触发器,地址是上面输出的地址 + endpoints 中的 path:
http://service-89bjzrye-1256773370.gz.apigw.tencentcs.com/release/test
打开实时日志:
scflog logs -n Hello_World -r ap-guangzhou
提醒实时日志开启成功:
DFOUNDERLIU-MB0:~ dfounderliu$ scflog logs -n Hello_World -r ap-guangzhou
实时日志开启 ...
用浏览器通过刚才函数部署完成返回的地址触发函数:
实时日志开启 ...
[2020-03-04 16:36:08] : ......}
[2020-03-04 16:36:09] : DEBUG debug_msg
[2020-03-04 16:36:10] : INFO info_msg
[2020-03-04 16:36:11] : WARNING warning_msg
[2020-03-04 16:36:14] : ERROR error_msg
[2020-03-04 16:36:14] : CRITICAL critical_msg
[2020-03-04 16:36:16] : context is: .......}
.......
至此,实现实时日志功能。
总结
Serverless 架构虽然拥有很多优势,但是同时也有劣势,没有什么事情是完美的,Serverless 架构也是如此。在 Serverless 架构下,日志的实时性确实是一个问题,这个问题不仅仅是我们可能要等十几秒才能看到日志,而且会影响开发效率、维护效率以及问题定位效率,但是我们可以通过自身来实现这样的功能,通过 API 网关的 Websocket 能力,通过云函数的与 API 网关的结合,构建一个实时日志的系统。
更多内容推荐
自建图床应用,我只推荐 Serverless
调研了十款图床,我打算自己建一个
实战 | 基于 Serverless 技术的视频截帧架构如何实现?
视频直播是一种创新的在线娱乐形式,具有多人实时交互特性,在电商、游戏、在线教育、娱乐等多个行业都有着非常广泛的应用。
28|认证机制:Flask 认证机制设计与实现
今天这节课,我们将一起进入项目实战环节,巩固一下你对Flask认证机制的应用能力。
2023-06-26
Serverless ETL —— 蘑菇街实战落地
轻量、快速、灵活、省心、低成本的 Serverless 通用数据处理解决方案!
20 行代码:Serverless 架构下用 Python 轻松搞定图像分类和预测
本文将会通过一个有趣的 Python 库,快速将图像分类的功能搭建在云函数上,并且和 API 网关结合,对外提供 API 功能,实现一个 Serverless 架构的“图像分类 API”。
2021-01-08
不改一行代码!快速迁移 Express 应用上云
快来一起实践吧!
Gitee x 腾讯云 Serverless,实现 Web 框架快速上云
Gitee 与 腾讯云 Serverless 会碰撞出什么火花,一起看看吧!
14. Middleware:Trace 简介和 OpenTelemetry
2023-09-26
20 行代码:Serverless 架构下用 Python 轻松搞定图像分类和预测
本文将会通过一个有趣的 Python 库,快速将图像分类的功能搭建在云函数上,并且和 API 网关结合,对外提供 API 功能,实现一个 Serverless 架构的“图像分类 API”。
2021-01-20
不改一行代码!快速迁移 Flask 应用上云
手把手教你部署 Flask 应用,快来一起实践吧!
38|Serverless:如何基于 Serverless 架构实现流式数据处理?
和当前的开源方案相比,Serverless 在架构理念和设计上是有极大的优势,在未来也具有很大的想象空间和收益空间。
2023-09-15
Golang function
极客时间《Go 语言核心 36 讲》学习笔记 07,图片来自网络
2021-05-20
开发者:Serverless 从懵比到实战
从入门到实战,这份教程请收好!
不改一行代码!快速迁移 Koa 应用上云
手把手教你部署 Koa 应用,快来一起实践吧
17. Prometheus 详解
2023-09-26
腾讯 IMWEB 前端团队一站式 Serverless 开发解决方案
腾讯 IMWEB 前端团队一站式 Serverless 开发解决方案
27|初识认证机制:认证机制能解决哪些问题?
在项目开发中,我们单独考虑开发功能还不够,还要通过Flask认证机制保护用户隐私和数据安全。
2023-06-23
20|应用监控:如何使用日志来监控应用?
使用日志对应用监控
2023-02-22
结合阿里云 FC 谈谈我对 FaaS 的理解
本文结合阿里云 FC、Midway FaaS 框架快速创建 FaaS 应用的实践,向大家展示了什么是 FaaS,FaaS 的工作流程,优缺点,展现了 FaaS 颠覆传统开发模式的魅力。
鹅厂分布式大气监测系统:以 Serverless 为核心的云端能力如何打造?
以 Serverless 为核心的云架构大气监测项目,放出源码啦!
推荐阅读
6、SpringMVC- 源码阅读之 DispatcherServlet 流程
2023-09-28
21|Web 开发(上):如何使用 Axum 框架进行 Web 后端开发?
2023-12-11
(转)大数据开发之 Hive 中 UDTF 函数
2021-12-21
DWS 临时内存不可用报错: memory temporarily unavailable
2023-10-27
Go 语言学习查缺补漏 ing Day2
2021-11-20
9.Nacos Server 处理注册流程
2023-09-29
华为云全新上线 Serverless 应用中心,支持一键构建文生图应用
2023-11-09
电子书
大厂实战PPT下载
换一换 汪晟杰 | 腾讯云 产品专家
董善东(梵登)博士 | 阿里云 技术专家
曹伟 | 杭州云猿生数据 创始人/CEO
评论