QCon 演讲火热征集中,快来分享技术实践与洞见! 了解详情
写点什么

Serverless 架构下如何实现日志的实时输出?

  • 2020-07-31
  • 本文字数:9705 字

    阅读完需:约 32 分钟

Serverless架构下如何实现日志的实时输出?

Serverless 白皮书中曾描述过 Serverless 的一些缺点,例如难以调试、冷启动严重等等。其中难以调试是表现在多个方面的,有一个方面是日志输出。


当我们把 Serverless 架构应用于实际项目,就会发现调试成为了效率的重要影响因素。以日志输出为例,某个函数被触发之后未得到预期结果,大家第一想法就是查看日志,但这时输出的日志可能并未是我们想要的,而且云厂商输出日志的延时也非常高。

日志输出现状

以腾讯云云函数为例,我们可以看一下其日志输出情况:


  • 通过控制台或者是云 API 的 Invoke 接口触发云函数:



通过这个测试功能,可以很快获取到函数的结果,并查看日志信息。


  • 通过 API 网关、COS 等触发云函数,此处以 API 网关为例:


通过网关触发一个函数:



通过函数日志查看何时会刷出这个日志:



这个过程大概有 11S,通过代码来进行更加详细的测试:


import json,timefrom tencentcloud.common import credentialfrom tencentcloud.common.profile.client_profile import ClientProfilefrom tencentcloud.common.profile.http_profile import HttpProfilefrom tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKExceptionfrom tencentcloud.scf.v20180416 import scf_client, modelstry:    cred = credential.Credential("", "")    httpProfile = HttpProfile()    httpProfile.endpoint = "scf.tencentcloudapi.com"
clientProfile = ClientProfile() clientProfile.httpProfile = httpProfile client = scf_client.ScfClient(cred, "ap-guangzhou", clientProfile)
req = models.InvokeRequest() params = '{"FunctionName":"test"}' req.from_json_string(params)
resp = client.Invoke(req) functionRequestId = json.loads(resp.to_json_string())["Result"][ "FunctionRequestId"]
print(time.time(), functionRequestId)
while True: time.sleep(0.2) req = models.GetFunctionLogsRequest() params = '{"FunctionName":"test"}' req.from_json_string(params)
resp = client.GetFunctionLogs(req) if functionRequestId in str(resp.to_json_string()): break
print(time.time())

except TencentCloudSDKException as err: print(err)

复制代码


输出结果:


1584108001.141546 ee7243dd-6532-11ea-8bce-5254000c8aa41584108005.2496068
复制代码


这次输出结果是 4S,再做一个多次调用的时间对比图:


import jsonimport timeimport numpyimport matplotlib.pyplot as pltfrom tencentcloud.common import credentialfrom tencentcloud.common.profile.client_profile import ClientProfilefrom tencentcloud.common.profile.http_profile import HttpProfilefrom tencentcloud.common.exception.tencent_cloud_sdk_exception import TencentCloudSDKExceptionfrom tencentcloud.scf.v20180416 import scf_client, models
try: cred = credential.Credential("", "") httpProfile = HttpProfile() httpProfile.endpoint = "scf.tencentcloudapi.com"
clientProfile = ClientProfile() clientProfile.httpProfile = httpProfile client = scf_client.ScfClient(cred, "ap-guangzhou", clientProfile)
timeList = [] for i in range(0, 100): req = models.InvokeRequest() params = '{"FunctionName":"test"}' req.from_json_string(params)
resp = client.Invoke(req) functionRequestId = json.loads(resp.to_json_string())["Result"]["FunctionRequestId"]
startTime = int(time.time())
while True: time.sleep(0.2) req = models.GetFunctionLogsRequest() params = '{"FunctionName":"test"}' req.from_json_string(params)
resp = client.GetFunctionLogs(req) if functionRequestId in str(resp.to_json_string()): break
endTime = int(time.time()) timeList.append(endTime - startTime)
print("最大时间", int(max(timeList))) print("最小时间", int(min(timeList))) print("平均时间", int(numpy.mean(timeList)))
plt.figure() plt.subplot(2, 1, 1) x_data = range(0, len(timeList)) plt.plot(x_data, timeList) plt.subplot(2, 1, 2) plt.hist(timeList, bins=20) plt.show()

except TencentCloudSDKException as err: print(err)

复制代码


这是比较差的一段代码,耗时很久,可以考虑加入队列,一方面多进程在队列面加入执行的 RequestId,一方面消费 RequestId,进入到获取 Logs 的对象中,速度可以大大提升。但是无论如何,运行结果如下:


最大时间 31最小时间 0平均时间 17
复制代码



通过这个结果,我们发现日志输出有两个问题:


  • 时间频率不固定,通过数据可以看到,快的话可能几秒就出结果,慢的话可能十几秒,二十几秒,甚至三十几秒;

  • 日志普遍输出速度很慢,会严重影响定位问题;


就目前的腾讯云 Serverless 架构而言,如果要在本地开发一个项目,并在本地进行了初步的调试,就算一切正常,也并不能保证在线上完全可用,尤其在复杂的触发器环境下以及复杂的对象复用、内网资源使用的前提下,本地调试的难度非常大,很难完整模拟出线上的环境。


以 API 网关触发器为例,当本地写完代码,调试完成部署线上,通过 API 网关触发一次,发现函数代码不能正常运行,这个时候的第一想法是什么?查看日志,看一下打印的日志有哪些问题,是不是通过日志可以判断出问题。很遗憾的告诉你,你可能要等几秒钟,十几秒钟,甚至二十几秒,三十秒。

自建日志输出功能

通过刚才的分析,我们可以知道,在线上触发函数的时候,日志入库的速度非常缓慢,而且极其不稳定,一定条件下会严重影响开发进度以及问题定位的进度。为了解决这个问题,我们可以通过 Serverless 架构,封装一套实时日志功能:



在这个操作过程中,主要使用一个 API 网关作为 Websocket 与客户端建立链接,三个函数(注册函数,上报函数,清理函数)与 API 搭配使用,存储桶作为部分资源的临时存储。


整个流程大概可以描述为:


  1. 客户端决定开启实时日志,并将要监控的函数信息(包括地域,命名空间,函数名)作为参数,与 API 网关建立 Websocket 链接;

  2. API 网关建立 Websocket 链接的时候,会触发注册函数,此时注册函数会将 RequestId(ConnectionId)与函数信息以 Key-Value 存储到对象存储中;

  3. 根据函数信息找到对应的函数,将回推地址以及 ConnectionId 写到函数环境变量中;

  4. 此时函数只要被触发,就会先读取环境变量,根据环境变量决定是否将函数日志上报到指定地址(即带着 connectionId 发送到回推地址);

  5. 上报函数收到业务函数传递过来的数据,将数据发送到指定的 ConnectionId 的客户端,实现实时日志的输出;

  6. 当客户端断开连接之后,会触发清理函数;

  7. 清理函数会清理掉业务函数中的回推地址和 ConnectionId 等信息,清理之后,业务函数再被触发,则会因为读取不到该参数,而不会上报数据;

  8. 将根据 RequestId(ConnectionId)从对象存储删除,至此完成一次日志实时输出功能;


由于腾讯云的 API 网关限制,所以该功能每次最长只能执行 900s,900s 之后需要重新执行该程序。


API 网关涉及到的三个函数:


  • 注册函数:主要用来完成数据存储和函数信息修改等操作,是用户建立链接时触发的函数;


# -*- coding: utf8 -*-
import json, osfrom qcloud_cos_v5 import CosConfigfrom qcloud_cos_v5 import CosS3Clientfrom tencentcloud.common import credentialfrom tencentcloud.scf.v20180416 import scf_client, models

def setFunction2Bucket(name, namespace, secretId, secretKey, token, connid): region = os.environ.get("bucket_region") config = CosConfig(Region=region, SecretId=secretId, SecretKey=secretKey, Token=token) client = CosS3Client(config) response = client.put_object( Bucket=os.environ.get("bucket"), Body=json.dumps({ "region": region, "namespace": namespace, "function": name }).encode("utf-8"), Key=connid, EnableMD5=False ) return response

def setFunctionConfigure(name, namespace, region, secreetId, secretKey, token, connid, transurl): try: environmentVariablesList = [ { "Key": "real_time_log_id", "Value": connid }, { "Key": "real_time_log_url", "Value": transurl }, { "Key": "real_time_log", "Value": "open" } ] cred = credential.Credential(secreetId, secretKey, token=token) client = scf_client.ScfClient(cred, region)
req = models.GetFunctionRequest() req.from_json_string(json.dumps({"FunctionName": name, "Namespace": namespace, "ShowCode": "FALSE"})) resp = client.GetFunction(req) environmentVariables = json.loads(resp.to_json_string())["Environment"]["Variables"] for eveVariables in environmentVariables: if eveVariables["Key"] == "real_time_log_id" or eveVariables["Key"] == "real_time_log_url" or eveVariables["Key"] == "real_time_log": continue environmentVariablesList.append(eveVariables)
req = models.UpdateFunctionConfigurationRequest() req.from_json_string(json.dumps({"FunctionName": name, "Environment": { "Variables": environmentVariablesList }, "Namespace": namespace})) client.UpdateFunctionConfiguration(req)
setFunction2Bucket(name, namespace, secreetId, secretKey, token, connid) return True except Exception as e: print(e) return False

def main_handler(event, context): print("event is: ", event)
connectionID = event['websocket']['secConnectionID'] if not setFunctionConfigure( event['queryString']['name'], event['queryString']['namespace'], event['queryString']['region'], os.environ.get("TENCENTCLOUD_SECRETID"), os.environ.get("TENCENTCLOUD_SECRETKEY"), os.environ.get("TENCENTCLOUD_SESSIONTOKEN"), connectionID, os.environ.get("url") ): return False
if 'requestContext' not in event.keys(): return {"errNo": 101, "errMsg": "not found request context"} if 'websocket' not in event.keys(): return {"errNo": 102, "errMsg": "not found web socket"}
retmsg = {} retmsg['errNo'] = 0 retmsg['errMsg'] = "ok" retmsg['websocket'] = { "action": "connecting", "secConnectionID": connectionID }
if "secWebSocketProtocol" in event['websocket'].keys(): retmsg['websocket']['secWebSocketProtocol'] = event['websocket']['secWebSocketProtocol'] if "secWebSocketExtensions" in event['websocket'].keys(): ext = event['websocket']['secWebSocketExtensions'] retext = [] exts = ext.split(";") print(exts) for e in exts: e = e.strip(" ") if e == "permessage-deflate": pass if e == "client_max_window_bits": pass retmsg['websocket']['secWebSocketExtensions'] = ";".join(retext)
print("connecting: connection id:%s" % event['websocket']['secConnectionID']) return retmsg

复制代码


  • 上报函数:用户开启实时日志成功之后,业务函数上报数据。


# -*- coding: utf8 -*-import osimport jsonimport requests

def main_handler(event, context): try: print("event is: ", event)
body = json.loads(event["body"])
url = os.environ.get("url")
retmsg = {} retmsg['websocket'] = {} retmsg['websocket']['action'] = "data send" retmsg['websocket']['secConnectionID'] = body["coid"] retmsg['websocket']['dataType'] = 'text' retmsg['websocket']['data'] = body["data"] print(retmsg) requests.post(url, json=retmsg)
return True except Exception as e: return False

复制代码


  • 清理函数:客户端关闭链接时触发的函数,部分操作是注册函数的逆操作。


# -*- coding: utf8 -*-
import json, osimport requestsfrom qcloud_cos_v5 import CosConfigfrom qcloud_cos_v5 import CosS3Clientfrom tencentcloud.common import credentialfrom tencentcloud.scf.v20180416 import scf_client, models

def setFunctionConfigure(name, namespace, region, secreetId, secretKey, token): try: environmentVariablesList = [{ "Key": "real_time_log", "Value": "close" }] cred = credential.Credential(secreetId, secretKey, token=token) client = scf_client.ScfClient(cred, region)
req = models.GetFunctionRequest() params = json.dumps({"FunctionName": name, "Namespace": namespace, "ShowCode": "FALSE"}) req.from_json_string(params)
resp = client.GetFunction(req) environmentVariables = json.loads(resp.to_json_string())["Environment"]["Variables"]
for eveVariables in environmentVariables: if eveVariables["Key"] == "real_time_log_id" or eveVariables["Key"] == "real_time_log_url" or eveVariables["Key"] == "real_time_log": continue environmentVariablesList.append(eveVariables)
print(environmentVariablesList) req = models.UpdateFunctionConfigurationRequest() params = json.dumps({"FunctionName": name, "Environment": { "Variables": environmentVariablesList }, "Namespace": namespace}) req.from_json_string(params)
resp = client.UpdateFunctionConfiguration(req) print(resp.to_json_string()) return True except Exception as e: print(e) return False

def main_handler(event, context): print("event is: ", event)
connectionID = event['websocket']['secConnectionID']
region = os.environ.get("bucket_region") secreetId = os.environ.get("TENCENTCLOUD_SECRETID") secretKey = os.environ.get("TENCENTCLOUD_SECRETKEY") token = os.environ.get("TENCENTCLOUD_SESSIONTOKEN") config = CosConfig(Region=region, SecretId=secreetId, SecretKey=secretKey, Token=token) client = CosS3Client(config) response = client.get_object( Bucket=os.environ.get("bucket"), Key=connectionID, ) response['Body'].get_stream_to_file('/tmp/connid.json') with open('/tmp/connid.json') as f: data = json.loads(f.read())
if not setFunctionConfigure( data["function"], data["namespace"], data["region"], secreetId, secretKey, token, ): return False
retmsg = {} retmsg['websocket'] = {} retmsg['websocket']['action'] = "closing" retmsg['websocket']['secConnectionID'] = connectionID requests.post(os.environ.get("url"), json=retmsg) return retmsg

复制代码


业务函数上报数据的逻辑,实际上就是修改常见组件的日志方法,以 Python 为例,例如重写print()方法以及logging组件:


重写print()


# -*- coding: utf8 -*-
import osimport sysimport jsonimport urllib.parseimport urllib.request

def print(*args): url = os.environ.get("real_time_log_url") cid = os.environ.get("real_time_log_id") if url and cid and os.environ.get("real_time_log_id", None): try: retmsg = { "coid": cid, "data": " ".join([str(eveObject) for eveObject in args]) } urllib.request.urlopen( urllib.request.Request( url=url, data=json.dumps(retmsg).encode("utf-8") ) ) except Exception as e: sys.stdout.write("Debug Error:" + str(e)) sys.stdout.write("aaa"+ " ".join([str(eveObject) for eveObject in args]) + "\n")
复制代码


logging进行额外的处理,将文件中的log/info…等接口增加上报逻辑,例如:


def warning(msg, *args, **kwargs):    """    Log a message with severity 'WARNING' on the root logger. If the logger has    no handlers, call basicConfig() to add a console handler with a pre-defined    format.    """    realTimeLogs("WARNING %s %s"%(str(msg), " ".join([str(eveObject) for eveObject in args])))    if len(root.handlers) == 0:        basicConfig()    root.warning(msg, *args, **kwargs)
复制代码


上报逻辑:


def realTimeLogs(data):    url = os.environ.get("real_time_log_url")    cid = os.environ.get("real_time_log_id")    if url and cid and os.environ.get("real_time_log_id", None):        try:            retmsg = {                "coid": cid,                "data": data            }            urllib.request.urlopen(                urllib.request.Request(                    url=url,                    data=json.dumps(retmsg).encode("utf-8")                )            )        except Exception as e:            sys.stdout.write("Debug Error:" + str(e))
复制代码

封装成工具

  • 将重写部分封装成客户端工具

  • 将线上函数部分封装成 Component


封装成工具后的整体使用流程:

组件的安装与配置

  • 安装scflog


npm install scflog
复制代码


  • 部署实时日志组件,新建项目,并且建立serverless.yaml,内容:


PythonLogs:  component: '@gosls/tencent-pythonlogs'  inputs:    region: ap-guangzhou
复制代码


通过sls --debug部署:


DEBUG ─ Setting tags for function PythonRealTimeLogs_CleanupDEBUG ─ Creating trigger for function PythonRealTimeLogs_CleanupDEBUG ─ Deployed function PythonRealTimeLogs_Cleanup successful
PythonLogs: websocket: ws://service-laabz6zm-1256773370.gz.apigw.tencentcs.com/test/python_real_time_logs 26s › PythonLogs › done

复制代码


配置组件:


scflog set -w ws://service-laabz6zm-1256773370.gz.apigw.tencentcs.com/test/python_real_time_logs
复制代码


配置成功输出:


DFOUNDERLIU-MB0:~ dfounderliu$ scflog set -w ws://service-laabz6zm-1256773370.gz.apigw.tencentcs.com/test/python_real_time_logs设置成功  websocket: ws://service-laabz6zm-1256773370.gz.apigw.tencentcs.com/test/python_real_time_logs  region: ap-guangzhou  namespace: default
复制代码

函数的初始化与部署

在项目中使用该组件的方法很简单。


  • 创建一个文件夹,并进入


mkdir scflogs && cd scflogs


  • 初始化项目


scflog init -l python


  • 创建index.py文件以及serverless.yaml文件:


vim index.py
复制代码


内容是:


from logs import *import timeimport logging
def main_handler(event, context): print("event is: ", event) time.sleep(1) logging.debug("this is debug_msg") time.sleep(1) logging.info("this is info_msg") time.sleep(1) logging.warning("this is warning_msg") time.sleep(1) logging.error("this is error_msg") time.sleep(1) logging.critical("this is critical_msg") time.sleep(1) print("context is: ", event) return "hello world"

复制代码


vim serverless.yaml
复制代码


内容是:


Hello_World:  component: "@serverless/tencent-scf"  inputs:    name: Hello_World    codeUri: ./    handler: index.main_handler    runtime: Python3.6    region: ap-guangzhou    description: My Serverless Function    memorySize: 64    timeout: 20    exclude:      - .gitignore      - .git/**      - node_modules/**      - .serverless      - .env    events:      - apigw:          name: serverless          parameters:            protocols:              - http            serviceName: serverless            description: the serverless service            environment: release            endpoints:              - path: /test                method: ANY

复制代码


通过sls --debug部署:


DEBUG ─ Deployed function Hello_World successful
Hello_World: Name: Hello_World Runtime: Python3.6 Handler: index.main_handler MemorySize: 64 Timeout: 20 Region: ap-guangzhou Namespace: default Description: My Serverless Function APIGateway: - serverless - http://service-89bjzrye-1256773370.gz.apigw.tencentcs.com/release
30s › Hello_World › done

复制代码

实时日志功能的测试

配置 APIGW 的触发器,地址是上面输出的地址 + endpoints 中的 path:


http://service-89bjzrye-1256773370.gz.apigw.tencentcs.com/release/test
复制代码


打开实时日志:


scflog logs -n Hello_World -r ap-guangzhou
复制代码


提醒实时日志开启成功:


DFOUNDERLIU-MB0:~ dfounderliu$ scflog logs -n Hello_World -r ap-guangzhou实时日志开启 ... 
复制代码


用浏览器通过刚才函数部署完成返回的地址触发函数:


实时日志开启 ... [2020-03-04 16:36:08] :  ......}[2020-03-04 16:36:09] :  DEBUG debug_msg [2020-03-04 16:36:10] :  INFO info_msg [2020-03-04 16:36:11] :  WARNING warning_msg [2020-03-04 16:36:14] :  ERROR error_msg [2020-03-04 16:36:14] :  CRITICAL critical_msg [2020-03-04 16:36:16] :  context is: .......}.......
复制代码


至此,实现实时日志功能。

总结

Serverless 架构虽然拥有很多优势,但是同时也有劣势,没有什么事情是完美的,Serverless 架构也是如此。在 Serverless 架构下,日志的实时性确实是一个问题,这个问题不仅仅是我们可能要等十几秒才能看到日志,而且会影响开发效率、维护效率以及问题定位效率,但是我们可以通过自身来实现这样的功能,通过 API 网关的 Websocket 能力,通过云函数的与 API 网关的结合,构建一个实时日志的系统。


2020-07-31 16:332558

评论

发布
暂无评论
发现更多内容

Android C/C++层hook和java层hook原理以及比较

云智慧AIOps社区

Java android 开发技能 hook

Serverless 让我们的运维更轻松

领创集团Advance Intelligence Group

#Serverless

火遍全网的MBTI人格测试,为什么会有那么多人相信?

小炮

MBTI

国产化云平台如何实现多云管控,黄河云来“打样儿”

BoCloud博云

国产化 云管理平台

如何做好任务管理,手把手教你怎么做最高效的任务管理

阿里云云效

云计算 阿里云 云原生 研发团队 项目协作

什么是代码加密?基于云效 Codeup的代码仓库加密是如何实现的

阿里云云效

云计算 阿里云 代码管理 Codeup 代码加密

易周金融观点:遏制NFT金融化等打下监管良基

易观分析

NFT

在线YAML转CSV工具

入门小站

工具

优秀程序员的30种思维(29/100)

hackstoic

技术思维

过去一周热点回顾|Hoo虎符研究院 区块链简报 20220418期

区块链前沿News

虎符交易所

OceanBase 杨传辉参与数据库技术与应用发展研讨会

OceanBase 数据库

oceanbase

【ELT.ZIP】OpenHarmony啃论文俱乐部——这些小风景你不应该错过

ELT.ZIP

神经网络 OpenHarmony ELT.ZIP

高效进行接口测试,简单易懂!

Liam

测试 Jmeter Postman swagger 测试工具

以OceanBase为例,分析事务型评测基准对分布式数据库的适用性

OceanBase 数据库

分布式数据库 oceanbase

【ELT.ZIP】OpenHarmony啃论文俱乐部——浅析稀疏表示医学图像

ELT.ZIP

OpenHarmony 医学影像 稀疏矩阵 ELT.ZIP

Apache Doris (incubating) 1.0 Release 版本正式发布!

ApacheDoris

数据库 大数据 开源 OLAP apache doris

HLP分词后的文本如何在web端高亮显示

lo

前端 4月月更

TDesign 更新周报(2022 年 4 月第 3 周)

TDesign

深圳助力建设全国「数据交易」大市场,「隐私计算」技术赋能数据要素安全流通

洞见科技

Java 操作 Office:POI word 之文档信息提取

程序员架构进阶

内容审核 4月日更 文档识别 4月月更

linux之rpm命令

入门小站

Linux

阿里云代码托管平台,不限容量,免费使用

阿里云云效

云计算 阿里云 代码管理 代码托管 阿里云代码托管

用css制作旋转的立方体

云智慧AIOps社区

CSS 前端 大前端 3D css特效

【愚公系列】2022年04月 二十三种设计模式(零)-简单工厂模式(Simple Factory Pattern)

愚公搬代码

4月月更

TASKCTL 连接不到服务器的4种情况

敏捷调度TASKCTL

分布式 调度引擎 ETL 自动化运维 调度任务

安全之花如何盛开在华为云空间的每个角落?

脑极体

书单 | “阿里云数字新基建”系列丛书全家福来啦!

博文视点Broadview

图数据库|正反向边的最终一致性——TOSS 介绍

NebulaGraph

图数据库 知识图谱

博云 BeyondCMP 云管理平台 5.6 版本发布

BoCloud博云

云管理平台

物联网低代码平台常用《组件介绍》

AIRIOT

开发 物联网 平台搭建、

移动端日历组件设计与实现

CRMEB

Serverless架构下如何实现日志的实时输出?_服务革新_刘宇_InfoQ精选文章