写点什么

搭建云上日志收集分析系统(三)

  • 2019-09-26
  • 本文字数:0 字

    阅读完需:约 1 分钟

搭建云上日志收集分析系统(三)

背景说明

应用系统的日志收集与分析工作对运维来说至关重要。常见的系统解决方案中开源技术栈 ELK(Elastic Stack: Elasticsearch, Logstash, Kibana)是当前比较流行的选择。下面我们会讨论另一种构建于云原生设计的类似于 ELK 的解决方案 EKK(Amazon Elasticsearch Service, Amazon Kinesis, and Kibana)。


EKK 的优势在于组件是 AWS 托管服务,不必自己安装、运维,并且与 AWS 的其它服务轻松集成,可以很轻松的部署一套可靠、可扩展、安全、容错以及解耦和基于事件的解决方案。


传统的 Elasticsearch 中,日志数据的不断膨胀,对数据的生命周期管理越来越重要(应对此需求的新功能 ILM(index lifecycle management)在 Elasticsearch 7.0 中闪亮登场)。本文不介绍 ILM,介绍另一种解决方案:使用 Lambda 配合实现数据的轮换。

使用 Lambda 在 ES 轮换数据数据(Rotate)

1. 为 Lambda 设置 IAM Role


2. 在 Lambda 中创建个 Layer(层)

把项目需要的依赖包放到层里,方便 Lambda 的使用。



层参考: https://docs.aws.amazon.com/zh_cn/lambda/latest/dg/configuration-layers.html


参考命令:


pip3 install elasticsearch-curator -t ./python/$zip -q -r layer.zip ./python



3. 创建 Lambda: ES-Rotate



设置 Lambda 运行的 Layers


保存






设置运行代码,点击



在下面 Function code 区域更新代码,host 地址换成我们创建的 ES Cluster 地址



import json
import boto3
from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestsHttpConnection
import curator


host = 'search-ekk-log-vpfpqvgbxnom3ctwvz5evv2du4.cn-north-1.es.amazonaws.com.cn' # For example, search-my-domain.region.es.amazonaws.com
region = 'cn-north-1' # For example, us-west-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)




def lambda_handler(event, context):
es = Elasticsearch(
hosts = [{'host': host, 'port': 443}],
http_auth = awsauth,
use_ssl = True,
verify_certs = True,
connection_class = RequestsHttpConnection
)


index_list = curator.IndexList(es)
index_list.filter_by_age(source='name', direction='older', timestring='%Y-%m-%d', unit='days', unit_count=1)
print("Found %s indices to delete" % len(index_list.indices))


# If our filtered list contains any indices, delete them.
if index_list.indices:
curator.DeleteIndices(index_list).do_action()




# TODO implement
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
复制代码


修改 Lambda 运行时内存和超时时间


4. 测试

定义测试用例输入参数,



因此 Lambda 测试用例,不需输入参数,可使用默认设置,



运行测试用例




在 ES 中查看



参考 https://docs.aws.amazon.com/zh_cn/elasticsearch-service/latest/developerguide/curator.html

5. 在 CloudWatch 设置定时轮换



Load to ES from S3

1. 创建 Lambad s3-to-es-bulk-by-hour

2. 设置 Layer



3. 设置 Code



Bucket 设置为 Kinesis Firehose 中设置的 bucket


Host 设置为 ES 的 endpoint


import boto3  import re  import requests  from requests_aws4auth import AWS4Auth  import json  from elasticsearch.helpers import bulk  import time  from elasticsearch import Elasticsearch, RequestsHttpConnection      region = 'cn-north-1' # e.g. us-west-1  service = 'es'  credentials = boto3.Session().get_credentials()  awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)    bucket = 'zhnc-ekk-full-log'    host = 'search-ekk-log-vpfpqvgbxnom3ctwvz5evv2du4.cn-north-1.es.amazonaws.com.cn'  type = 'log'  headers = { "Content-Type": "application/json" }    s3 = boto3.client('s3', region_name=region)    def get_all_s3_keys(bucket, prefix):  keys = []  kwargs = {'Bucket': bucket, 'Prefix':prefix}  while True:  resp = s3.list_objects_v2(**kwargs)  for obj in resp['Contents']:  keys.append(obj['Key'])  try:  kwargs['ContinuationToken'] = resp['NextContinuationToken']  except KeyError:  break  return keys      # Lambda execution starts here  def lambda_handler(event, context):    print(event)  for record in event['Records']:  msg = json.loads(json.dumps(eval(record['Sns']['Message'])))  year=msg['year']  month=msg['month']  day=msg['day']  hour=msg['hour']  index = 'apachelog-{}-{}-{}-{}'.format(year,month,day,hour)    print(index)    keys = get_all_s3_keys(bucket,"apachelog/{}/{}/{}/{}".format(year,month,day,hour))    print(len(keys));    es = Elasticsearch(  hosts = [{'host': host, 'port': 443}],  http_auth = awsauth,  use_ssl = True,  verify_certs = True,  connection_class = RequestsHttpConnection  )      for key in keys:    ACTIONS = []  obj = s3.get_object(Bucket=bucket, Key=key)  body = obj['Body'].read()  lines = body.splitlines()    for line in lines:    document = json.loads(line)  action = {  "_index": index,  "_type": type,  "_source": document  }  ACTIONS.append(action)  success, _ = bulk(es, ACTIONS, index=index, raise_on_error=True)  print('Performed %d actions' % success)
复制代码


4. 设置运行内存,超时时间

5. 测试

这个 Lambda 将被 SNS 触发,创建一个模拟 SNS 的事件,




消息 Message 设置为将要测试的事件,Lambda 会读取对应的 S3 文件到 ES 中,


消息内容为,


{
"Records": [
{
"EventSource": "aws:sns",
"EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws-cn:sns:cn-north-1: 725362542198:s3-to-es-by-day:caf9a3b1-679c-4604-9b65-f15dca3b5b18",
"Sns": {
"Type": "Notification",
"MessageId": "71c6da65-49bf-5301-8270-8ff199faaa1b",
"TopicArn": "arn:aws-cn:sns:cn-north-1: 725362542198:s3-to-es-by-day",
"Subject": "None",
"Message": "{'year':'2019','month':'06','day':'30','hour':'02'}",
"Timestamp": "2019-07-02T03: 22: 45.020Z",
"SignatureVersion": "1",
"Signature": "ihaGN/JL8u/v57xEY1RTFekpUpgVukM9Ebj9IIM9Rr9KGkUMe6dO7hze7estD0yM9K0QRQAreQ5XiB0Tfj/jOCvyjL9IrRcTplQcWPzMHmVqd4C3942gduFkHyul2+lYa0DJZM46J/Yy7mihe9EfXUySf2Eyok4NsUC6WtnbyJPN17FG1t4fnEWpRwU2Yg+MLM4bJWr3sK5/6xRnUVerLlMm5tCsynybW6FQCYsVgl7SJLW6nBmbCe3v6jRMuKCNW8xptVyEAnII4h5uPVElts0IWhnE+EQG3FNFmOZmj8OLZutRadSrNFexRMZebmKwRZRD5dTaCoD5E6v6TTYGbQ==",
"SigningCertUrl": "https: //sns.cn-north-1.amazonaws.com.cn/SimpleNotificationService-3250158c6506d40f628c21ed8dad1787.pem",
"UnsubscribeUrl": "https://sns.cn-north-1.amazonaws.com.cn/?Action=Unsubscribe&SubscriptionArn=arn:aws-cn:sns:cn-north-1:725362542198:s3-to-es-by-day:caf9a3b1-679c-4604-9b65-f15dca3b5b18",
"MessageAttributes": {}
}
}
]
}
复制代码


此事例将读取以下位置的日志文件



点击测试



ES Index 已经导入



作者简介


陈朕,AWS 解决方案架构师,负责基于 AWS 云计算方案架构的咨询和设计,在国内推广 AWS 云平台技术和各种解决方案。十余年分布式应用、大数据的分布式处理经验。


本文转载自 AWS 技术博客


原文链接:


https://amazonaws-china.com/cn/blogs/china/ekk-amazon-elasticsearch-service-amazon-kinesis-and-kibana/


2019-09-26 17:021166
用户头像

发布了 1602 篇内容, 共 71.3 次阅读, 收获喜欢 67 次。

关注

评论

发布
暂无评论
发现更多内容

对比Web3支付赛道主要项目,看为何Zebec生态被严重低估

西柚子

Nacos是什么

华为云开发者联盟

云计算 后端 华为云 12 月 PK 榜

微软宣布 S2C2F 已被 OpenSSF 采用

SEAL安全

microsoft OpenSSF 12 月 PK 榜 S2C2F

小间距LED是一个很有前途的产品

Dylan

LED显示屏 全彩LED显示屏 led显示屏厂家

实例解析丨一文搞定GaussDB CM服务异常

华为云开发者联盟

数据库 虚拟机 华为云 12 月 PK 榜

工作中常用的设计模式--责任链模式

lpe234

Java 后端 设计模式 责任链模式 spring-boot

【DBA100人】Payso张耀辉:学材料专业出身的他转身做了“码农”

OceanBase 数据库

数据库 dba oceanbase

前端一面经典vue面试题(持续更新中)

bb_xiaxia1998

Vue

社招前端经典vue面试题汇总

bb_xiaxia1998

Vue

前端工程师常考手写面试题指南

helloworld1024fd

JavaScript

js函数柯里化-面试手写版

helloworld1024fd

JavaScript

JS继承有哪些,你能否手写其中一两种呢?

helloworld1024fd

JavaScript

火山引擎DataTester:如何用A/B测试做产品增长?

字节跳动数据平台

大数据 AB testing实战 12 月 PK 榜

瓴羊Quick BI:多项自助分析功能提升企业数据分析能力

夏日星河

前端手写面试题合集

helloworld1024fd

JavaScript

源码深度解析之 Spring IOC

小小怪下士

Java spring spring ioc

CDH+Kylin三部曲之三:Kylin官方demo

程序员欣宸

大数据 kylin 12月月更

vue面试之Composition-API响应式包装对象原理

bb_xiaxia1998

Vue

《迈向智能世界》计算白皮书正式上线

科技热闻

阿里云携手深势科技,助力泓博医药加速药物研发

云布道师

阿里云 药物研发

React的useLayoutEffect和useEffect执行时机有什么不同

beifeng1996

React

AngularJS进阶(三十一)AngularJS项目开发技巧之获取模态对话框中的组件ID

No Silver Bullet

项目开发 AngularJS 12月月更

一针见血!Spring Boot终极手册来袭:从入门到实战

程序知音

Java 分布式 微服务 springboot 后端技术

低代码平台的五大核心引擎能力

元年技术洞察

低代码 数字化转型 方舟平台

React循环DOM时为什么需要添加key

beifeng1996

React

前端react面试题指北

beifeng1996

React

vue这些原理你都知道吗?(面试版)

bb_xiaxia1998

Vue

react面试题总结一波,以备不时之需

beifeng1996

React

快来给你的宠物视频加个表情特效吧

华为云开发者联盟

人工智能 华为云 12 月 PK 榜

搭建云上日志收集分析系统(三)_云计算_亚马逊云科技 (Amazon Web Services)_InfoQ精选文章