背景说明
应用系统的日志收集与分析工作对运维来说至关重要。常见的系统解决方案中开源技术栈 ELK(Elastic Stack: Elasticsearch, Logstash, Kibana)是当前比较流行的选择。下面我们会讨论另一种构建于云原生设计的类似于 ELK 的解决方案 EKK(Amazon Elasticsearch Service, Amazon Kinesis, and Kibana)。
EKK 的优势在于组件是 AWS 托管服务,不必自己安装、运维,并且与 AWS 的其它服务轻松集成,可以很轻松的部署一套可靠、可扩展、安全、容错以及解耦和基于事件的解决方案。
传统的 Elasticsearch 中,日志数据的不断膨胀,对数据的生命周期管理越来越重要(应对此需求的新功能 ILM(index lifecycle management)在 Elasticsearch 7.0 中闪亮登场)。本文不介绍 ILM,介绍另一种解决方案:使用 Lambda 配合实现数据的轮换。
使用 Lambda 在 ES 轮换数据数据(Rotate)
1. 为 Lambda 设置 IAM Role
2. 在 Lambda 中创建个 Layer(层)
把项目需要的依赖包放到层里,方便 Lambda 的使用。
层参考: https://docs.aws.amazon.com/zh_cn/lambda/latest/dg/configuration-layers.html
参考命令:
mkdirpythonpip3 install elasticsearch-curator -t ./python/$zip -q -r layer.zip ./python
3. 创建 Lambda: ES-Rotate
设置 Lambda 运行的 Layers
保存
设置运行代码,点击
在下面 Function code 区域更新代码,host 地址换成我们创建的 ES Cluster 地址
import json
import boto3
from requests_aws4auth import AWS4Auth
from elasticsearch import Elasticsearch, RequestsHttpConnection
import curator
host = 'search-ekk-log-vpfpqvgbxnom3ctwvz5evv2du4.cn-north-1.es.amazonaws.com.cn' # For example, search-my-domain.region.es.amazonaws.com
region = 'cn-north-1' # For example, us-west-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
def lambda_handler(event, context):
es = Elasticsearch(
hosts = [{'host': host, 'port': 443}],
http_auth = awsauth,
use_ssl = True,
verify_certs = True,
connection_class = RequestsHttpConnection
)
index_list = curator.IndexList(es)
index_list.filter_by_age(source='name', direction='older', timestring='%Y-%m-%d', unit='days', unit_count=1)
print("Found %s indices to delete" % len(index_list.indices))
# If our filtered list contains any indices, delete them.
if index_list.indices:
curator.DeleteIndices(index_list).do_action()
# TODO implement
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
}
复制代码
修改 Lambda 运行时内存和超时时间
4. 测试
定义测试用例输入参数,
因此 Lambda 测试用例,不需输入参数,可使用默认设置,
运行测试用例
在 ES 中查看
参考 https://docs.aws.amazon.com/zh_cn/elasticsearch-service/latest/developerguide/curator.html
5. 在 CloudWatch 设置定时轮换
Load to ES from S3
1. 创建 Lambad s3-to-es-bulk-by-hour
2. 设置 Layer
3. 设置 Code
Bucket 设置为 Kinesis Firehose 中设置的 bucket
Host 设置为 ES 的 endpoint
import boto3 import re import requests from requests_aws4auth import AWS4Auth import json from elasticsearch.helpers import bulk import time from elasticsearch import Elasticsearch, RequestsHttpConnection region = 'cn-north-1' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) bucket = 'zhnc-ekk-full-log' host = 'search-ekk-log-vpfpqvgbxnom3ctwvz5evv2du4.cn-north-1.es.amazonaws.com.cn' type = 'log' headers = { "Content-Type": "application/json" } s3 = boto3.client('s3', region_name=region) def get_all_s3_keys(bucket, prefix): keys = [] kwargs = {'Bucket': bucket, 'Prefix':prefix} while True: resp = s3.list_objects_v2(**kwargs) for obj in resp['Contents']: keys.append(obj['Key']) try: kwargs['ContinuationToken'] = resp['NextContinuationToken'] except KeyError: break return keys # Lambda execution starts here def lambda_handler(event, context): print(event) for record in event['Records']: msg = json.loads(json.dumps(eval(record['Sns']['Message']))) year=msg['year'] month=msg['month'] day=msg['day'] hour=msg['hour'] index = 'apachelog-{}-{}-{}-{}'.format(year,month,day,hour) print(index) keys = get_all_s3_keys(bucket,"apachelog/{}/{}/{}/{}".format(year,month,day,hour)) print(len(keys)); es = Elasticsearch( hosts = [{'host': host, 'port': 443}], http_auth = awsauth, use_ssl = True, verify_certs = True, connection_class = RequestsHttpConnection ) for key in keys: ACTIONS = [] obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() for line in lines: document = json.loads(line) action = { "_index": index, "_type": type, "_source": document } ACTIONS.append(action) success, _ = bulk(es, ACTIONS, index=index, raise_on_error=True) print('Performed %d actions' % success)
复制代码
4. 设置运行内存,超时时间
5. 测试
这个 Lambda 将被 SNS 触发,创建一个模拟 SNS 的事件,
消息 Message 设置为将要测试的事件,Lambda 会读取对应的 S3 文件到 ES 中,
消息内容为,
{
"Records": [
{
"EventSource": "aws:sns",
"EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws-cn:sns:cn-north-1: 725362542198:s3-to-es-by-day:caf9a3b1-679c-4604-9b65-f15dca3b5b18",
"Sns": {
"Type": "Notification",
"MessageId": "71c6da65-49bf-5301-8270-8ff199faaa1b",
"TopicArn": "arn:aws-cn:sns:cn-north-1: 725362542198:s3-to-es-by-day",
"Subject": "None",
"Message": "{'year':'2019','month':'06','day':'30','hour':'02'}",
"Timestamp": "2019-07-02T03: 22: 45.020Z",
"SignatureVersion": "1",
"Signature": "ihaGN/JL8u/v57xEY1RTFekpUpgVukM9Ebj9IIM9Rr9KGkUMe6dO7hze7estD0yM9K0QRQAreQ5XiB0Tfj/jOCvyjL9IrRcTplQcWPzMHmVqd4C3942gduFkHyul2+lYa0DJZM46J/Yy7mihe9EfXUySf2Eyok4NsUC6WtnbyJPN17FG1t4fnEWpRwU2Yg+MLM4bJWr3sK5/6xRnUVerLlMm5tCsynybW6FQCYsVgl7SJLW6nBmbCe3v6jRMuKCNW8xptVyEAnII4h5uPVElts0IWhnE+EQG3FNFmOZmj8OLZutRadSrNFexRMZebmKwRZRD5dTaCoD5E6v6TTYGbQ==",
"SigningCertUrl": "https: //sns.cn-north-1.amazonaws.com.cn/SimpleNotificationService-3250158c6506d40f628c21ed8dad1787.pem",
"UnsubscribeUrl": "https://sns.cn-north-1.amazonaws.com.cn/?Action=Unsubscribe&SubscriptionArn=arn:aws-cn:sns:cn-north-1:725362542198:s3-to-es-by-day:caf9a3b1-679c-4604-9b65-f15dca3b5b18",
"MessageAttributes": {}
}
}
]
}
复制代码
此事例将读取以下位置的日志文件
点击测试
ES Index 已经导入
作者简介
陈朕,AWS 解决方案架构师,负责基于 AWS 云计算方案架构的咨询和设计,在国内推广 AWS 云平台技术和各种解决方案。十余年分布式应用、大数据的分布式处理经验。
本文转载自 AWS 技术博客
原文链接:
https://amazonaws-china.com/cn/blogs/china/ekk-amazon-elasticsearch-service-amazon-kinesis-and-kibana/
评论