本文由 dbaplus 社群授权转载。
前言
某业务导致 NameNode RPC 通信频繁,后来观察监控发现,是由于该业务获取 HDFS 列表文件的频率过于频繁。检查代码后,优化由 20s 获取一次目录列表改为 5 分钟获取一次,获取列表的 RPC 操作次数下降了约 1.5 倍,平均每秒减少了 2~3w 次的 RPC 操作。
还有很多业务场景,通过分析观察 RPC 画像,都发现了其不合理性,这里就不一一列举了。本文主要记录如何通过 ELK 快速分析 NameNode RPC 操作并对接 Grafana 展示。
通过 ELK 快速分析 NameNode RPC 操作
ELK 是当前比较主流的分布式日志收集处理工具。这里采用 Filebeat→Kafka 集群→Logstash→ES→Kibana。
采用原因:
1)Filebeat 是基于原先 logstash-forwarder 的源码改造出来的。换句话说:Filebeat 就是新版的 logstash-forwarder,也会是 Elastic Stack 在 shipper 端的第一选择。
2)小贴士:虽然 LogStash::Inputs::TCP 用 Ruby 的 Socket 和 OpenSSL 库实现了高级的 SSL 功能,但 Logstash 本身只能在 SizedQueue 中缓存 20 个事件。这就是我们建议在生产环境中换用其他消息队列的原因。
而 Redis 服务器是 Logstash 官方推荐的 Broker 选择,Broker 角色也就意味着会同时存在输入和输出两个插件。
Kafka 是一个高吞吐量的分布式发布订阅日志服务,具有高可用、高性能、分布式、高扩展、持久性等特性。目前已经在各大公司中广泛使用。和之前采用 Redis 做轻量级消息队列不同,Kafka 利用磁盘作队列,所以也就无所谓消息缓冲时的磁盘问题。此外,如果公司内部已有 Kafka 服务在运行,Logstash 也可以快速接入,免去重复建设的麻烦。
3)目前 Logstash1.5 版本已自带支持 Kafka 插件,所以只需要学会如何书写 Logstash 规则,并且 Kafka 消费使用 high-level 消费。
4)Filebeat 部署在应用服务器上(只负责 Logstash 的读取和转发,降低 CPU 负载消耗,确保不会抢占应用资源),Logstash、ES、Kibana 在一台服务器上(此处的 Logstash 负责日志的过滤,会消耗一定的 CPU 负载,可以考虑如何优化过滤的语法步骤来达到降低负载)。
具体搭建步骤:Filebeat 安装使用(思考后决定 Filebeat 使用 Zip 安装或者 tar.gz 方便修改配置打包分发。)→Logstash 插件配置。
以下是架构图:
1、Filebeat 采集 hdfs-audit.log 日志传输给 Kafka 或者 Logstash
[hadoop@lf319-m3-002 filebeat]$ vi dynamically.config/audit-logstash.yml
filebeat.prospectors:
- input_type: log
paths:
- "/var/log/hadoop-hdfs/hdfs-audit.log"
harvester_buffer_size: 32768
scan_frequency: 1s
backoff: 10ms
processors:
- drop_fields:
fields: ["beat", "beat.name", "beat.hostname","beat.version","input_type","offset","@timestamp","type","source"]
output.logstash:
hosts: ["logstash-host:5044"," logstash-host:5045"]
loadbalance: true
worker: 4
bulk_max_size: 4096
xpack.monitoring:
enabled: true
elasticsearch:
hosts: ["https://es-host1:9200", "https:// es-host2:9200"]
username: beats_system
password: beat@123
复制代码
2、Logstash 进一步分解日志,格式化日志数据
这里需要我们先查看下日志的格式,然后选择方便的日志格式化方式来解析日志。
日志格式案例:
2019-08-25 13:11:58,630 INFO FSNamesystem.audit: allowed=true ugi=lf_zh_pro (auth:SIMPLE) ip=/dn-ip cmd=getfileinfo src=/user/lf_zh_pro/test/CommonFilter/sync/biz_id=B43/day_id=20190825/prov_id=089/part-00019-1566675749932.gz dst=null perm=null proto=rpc
2019-08-25 13:11:58,630 INFO FSNamesystem.audit: allowed=true ugi=lf_xl_bp (auth:SIMPLE) ip=/dn-ip cmd=create src=/user/lf_xl_bp/lf_xl_src.db/src_d_trip_all/date_id=20190825/hour_id=13/minute_id=00/.hive-staging_hive_2019-08-25_13-10-18_301_9180087219965934496-1/_task_tmp.-ext-10002/prov_id=031/_tmp.000238_0dst=null perm=lf_xl_bp:lf_xl_bp:rw-rw-r-- proto=rpc
2019-08-25 13:11:58,630 INFO FSNamesystem.audit: allowed=true ugi=ubd_obx_test (auth:SIMPLE) ip=/ dn-ip cmd=rename
复制代码
通过观察可以发现上面的每条日志格式都是一致的,都由时间戳、日志级别、是否开启审计、用户、来源 IP、命令类型这几个字段组成。那么相较于 grok 来说 dissect 更加简明。
Dissect 的使用规则:https://www.elastic.co/guide/en/logstash/current/plugins-filters-dissect.html
Logstash 配置如下:
input {
beats {
port => "5045"
}
}
filter {
if "/user/if_ia_pro/output/test" in [message] {
dissect {
mapping => { "message" => "%{logd} %{drop} %{level} %{log-type}: %{?allowed}=%{&allowed} %{?ugi}=%{&ugi} (%{?authtype}) %{?ip}=/%{&ip} %{?cmd}=%{&cmd} %{}=/user/if_ia_pro/output/test/%{src2}/%{src3}/%{} %{?dst}=%{&dst} %{?perm}=%{&perm} %{?proto}=%{&proto}" }
add_field => {
"srctable" => "/user/if_ia_pro/output/test/%{src2}/%{src3}"
"logdate" => "%{logd} %{drop}"
}
remove_field => ['message','src2','src3','logd','drop']
}
}
else if "/user/lf_zh_pro/lf_safedata_pro/output/" in [message] {
dissect {
mapping => { "message" => "%{logd} %{drop} %{level} %{log-type}: %{?allowed}=%{&allowed} %{?ugi}=%{&ugi} (%{?authtype}) %{?ip}=/%{&ip} %{?cmd}=%{&cmd} %{}=/user/lf_zh_pro/lf_safedata_pro/output/%{src2}/%{} %{?dst}=%{&dst} %{?perm}=%{&perm} %{?proto}=%{&proto}" }
add_field => {
"srctable" => "/user/lf_zh_pro/lf_safedata_pro/output/%{src2}"
"logdate" => "%{logd} %{drop}"
}
remove_field => ['message','src2','drop']
}
}
else if "/files/" in [message] {
dissect {
mapping => { "message" => "%{logd} %{drop} %{level} %{log-type}: %{?allowed}=%{&allowed} %{?ugi}=%{&ugi} (%{?authtype}) %{?ip}=/%{&ip} %{?cmd}=%{&cmd} %{}=/files/%{src2}/%{} %{?dst}=%{&dst} %{?perm}=%{&perm} %{?proto}=%{&proto}" }
add_field => {
"srctable" => "/files/%{src2}"
"logdate" => "%{logd} %{drop}"
}
remove_field => ['message','src2','drop']
}
}
else {
dissect {
mapping => { "message" => "%{logd} %{drop} %{level} %{log-type}: %{?allowed}=%{&allowed} %{?ugi}=%{&ugi} (%{?authtype}) %{?ip}=/%{&ip} %{?cmd}=%{&cmd} %{}=/%{src}/%{src1}/%{src2}/%{src3}/%{} %{?dst}=%{&dst} %{?perm}=%{&perm} %{?proto}=%{&proto}" }
add_field => {
"srctable" => "/%{src}/%{src1}/%{src2}/%{src3}"
"logdate" => "%{logd} %{drop}"
}
remove_field => ['message','src','src1','src2','src3','logd','drop']
}
}
date {
match => [ "logdate","ISO8601" ]
target => "@times"
remove_field => ['logdate']
}
}
output {
elasticsearch {
hosts => ["es-host:9200"]
index => "logstash-hdfs-auit-%{+YYYY.MM.dd}"
user => "elastic"
password => "password"
}
stdout { }
}
复制代码
3、ES 上观察数据
Filebeat 和 Logstash 配置好采集分析 hdfs-audit.log 之后启动进程,到 ES 上观察会发现创建有 logstash-hdfs-auit- YYYY.MM.dd 的 index。
具体查看数据,可以看到已经具备多个需要使用到的字段。
Grafana 配置 NameNode RPC 操作
最后一步就需要在 Grafana 上配置连接 ES 数据库。
然后创建 Dashboard 依次配置以下几种查询展示:
1)集群整体 RPC 每分钟连接次数
2)HDFS 路径 All 下 All 类型每分钟操作计数
3)All 类型操作计数最多的 hdfs 路径
4)路径 All 下操作计数排行前五的类型 和 All 操作类型下操作计数前五的路径
总结
那么现在对于企业来说,不管是在物理机上还是云上,玩自己的大数据平台跑生产任务,就不可避免会有不够合理不够优化的任务,比如最简单的集群对拷任务出现异常中断时,我们通常会挂定时任务并对 hadoop distcp 添加-update 参数,进行对比更新覆盖,这时当定时吊起的过于频繁,就会发现当对拷目录下文件数越来越多,NameNode 对该目录的 listStatus 类型的 RPC 连接会激增,这时我们就需要优化对拷任务。
RPC 的监控只是监控大数据平台的一个指标,这里通过这篇文章,带大家了解下如何快速地采集分析平台日志,并进行展示监控。
作者介绍:
小火牛,项目管理高级工程师,具有多年大数据平台运维管理及开发优化经验。管理过多个上千节点集群,擅长对外多租户平台的维护开发。信科院大数据性能测试、功能测试主力,大厂 PK 获得双项第一。
原文链接:
https://mp.weixin.qq.com/s?__biz=MzI4NTA1MDEwNg==&mid=2650782456&idx=2&sn=ca734daeca8af622bda4ec9f424ca75a&chksm=f3f90f6dc48e867bf6d60de56843e5338af9b95ca88fbb7bd5a61cb0b9107c44bdaf887a874b&scene=27#wechat_redirect
评论