Flink任务一般为实时不断运行的任务,如果没有任务监控, 任务异常时无法第一时间处理会比较麻烦。 这里通过调用API接口方式来获取参数,实现任务监控。

Flink任务监控(基于API接口编写shell脚本) 一 flink-on-yarn 模式 二 编写shell 脚本 

监控集群指标

http://rm-http-address:port/ws/v1/cluster/metrics

 响应正文

**

**

0

**

**

**

0

**

**

0

0

**

**

0

**

0

0

**

**

53

**

0

**

0

**

0

0

0

**

0

0

false

clusterMetrics 对象的元素

项目数据类型描述apps已提交int提交的申请数量应用已完成int完成的申请数量apps待定int待处理的申请数量应用程序正在运行int正在运行的应用程序数apps失败int失败的应用程序数应用已杀死int被终止的应用程序数保留MB长预留的内存量(以 MB 为单位)可用MB长可用内存量(以 MB 为单位)已分配MB长分配的内存量(以 MB 为单位)总MB长总内存量(以 MB 为单位)保留虚拟核心长保留的虚拟核心数可用虚拟核心长可用虚拟核心数分配的虚拟核心长分配的虚拟核心数totalVirtualCores 虚拟核心数长虚拟核心总数容器已分配int分配的容器数容器保留int保留的容器数容器挂起int待处理的容器数总节点数int节点总数活动节点int活动节点数丢失节点int丢失的节点数不健康的节点int不正常的节点数停用节点int停用的节点数已停用节点int停用的节点数rebooted节点int重新启动的节点数shutdown节点int关闭的节点数

获取所有application

curl -s http://XXX:8088/ws/v1/cluster/apps

获取 state值为 RUNNING 的application任务

curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING 

获取这个任务单个信息 

curl -s http://XXX:8088/ws/v1/cluster/apps/application_1619074605427_0063 |jq .app.state

请注意,根据安全设置,用户可能无法看到所有字段。 

项目数据类型描述编号字符串应用程序 ID用户字符串启动应用程序的用户名字字符串应用程序名称队列字符串提交应用程序的队列州字符串根据 ResourceManager 的应用程序状态 - 有效值是 YarnApplicationState 枚举的成员:NEW、NEW_SAVING、SUBMITTED、ACCEPTED、RUNNING、FINISHED、FAILED、KILLEDfinalStatus字符串应用程序的最终状态(如果已完成)(由应用程序本身报告)有效值是 FinalApplicationStatus 枚举的成员:UNDEFINED、SUCCEEDED、FAILED、KILLED进展浮以百分比表示的申请进度trackingUI字符串跟踪 URL 当前指向的位置 - 历史记录(用于历史记录服务器)或 ApplicationMastertrackingUrl字符串可用于跟踪应用程序的 Web URL诊断字符串详细的诊断信息clusterId长集群 ID应用程序类型字符串应用程序类型application标签字符串应用程序的逗号分隔标记优先权字符串所提交申请的优先权开始时间长应用程序启动的时间(自纪元以来的毫秒)完成时间长应用程序完成的时间(以纪元以来的毫秒数为单位)elapsedTime长自应用程序启动以来经过的时间(以毫秒为单位)amContainer日志字符串应用程序主容器日志的 URLamHostHttp地址字符串应用程序主机的节点 http 地址amRPCAddress字符串应用程序主机的 RPC 地址已分配MBint分配给应用程序正在运行的容器的内存总和(以 MB 为单位)已分配VCoresint分配给应用程序正在运行的容器的虚拟核心的总和running容器int当前为应用程序运行的容器数memorySeconds长应用程序分配的内存量(兆字节-秒)vcore秒数长应用程序分配的 CPU 资源量(虚拟内核 - 秒)queueUsagePercentage浮应用正在使用的队列资源的百分比clusterUsage百分比浮应用正在使用的群集资源的百分比。抢占ResourceMB长抢占式容器使用的内存preemptedResourceVCores长抢占容器使用的虚拟核心数numNonAMContainer抢占int抢占的标准容器数numAMContainer抢占int抢占的应用程序主容器数logAggregationStatus字符串日志聚合的状态 - 有效值是 LogAggregationStatus 枚举的成员:DISABLED、NOT_START、RUNNING、RUNNING_WITH_FAILURE、SUCCEEDED、FAILED、TIME_OUTunmanaged应用程序布尔应用程序是否处于非托管状态。appNodeLabelExpression字符串节点标签表达式,用于标识默认情况下应在其上运行应用程序容器的节点。amNodeLabel表达式字符串节点标签表达式,用于标识应用程序的 AM 容器预期在其上运行的节点。

jq,是linux一个很方便的json处理工具

通俗的说就是一个能够接受json,处理json,输出json的程序,反正很好用。

安装起来也非常的方便,直接使用yum即可安装。linux下离线安装jq工具 - 代码天地 (codetd.com)

yum install jq

编写shell脚本

由于公司离线yarn和实时yarn 采用是分开的方式。 只需要监控实时yarn 任务有没有处于RUNNING,达到监控的目的 这里shell脚本也只记录,flink-on-yarn 这种部署方式任务监控 shell脚本水平有限,大家多多谅解,欢迎指导

shell脚本实现功能: 获取线运行job任务,记录到日志文件。下一次脚本调用时候读取日志文件,判断状态。 不是RUNNING,就告警同时重新记录日志。

#!/bin/bash

Joblist=`cat /opt/shell/logs/flink_job.log` #获取记录job的log文件

let i=0 #获取任务数

let log_count=0 #获取日志中的任务数

start_count=RUNNING #判断任务是否存在异常

############## 1 判断日志文件内容是否为空,为空时自动读取flink任务并记录到日志文件 #########

if [ -z "$Joblist" ]

then

while :

do

job_id[$i]=`curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`

if [ ${job_id[$i]} = "null" ];then

break

else

echo ${job_id[$i]}

echo ${job_id[$i]}>>/opt/shell/logs/flink_job.log

let i++

fi

done

fi

############## 2 读取文件中JOB任务 ##################

let i=0

while read line

do

JOB[$i]=$line

let i++

done

log_count=$i #获取日志中的任务数

########### 3 判断任务状态,是否为RUNNIG,不是则邮件告警 ###############

for ((j=0;j

do

JOB_ID=${JOB[$j]//\"}

JOB_status=`curl -s http://XXXX:8088/ws/v1/cluster/apps/$JOB_ID | jq .app.state`

JOB_NAME=`curl -s http://XXX:8088/ws/v1/cluster/apps/$JOB_ID | jq .app.name`

START=$[`curl -s http://XXX:8088/ws/v1/cluster/apps/$JOB_ID | jq .app.startedTime` / 1000]

# echo "JOB_NAME: "$JOB_NAME

# echo 启动时间: `date -d @$START +"%F %H:%M:%S"`

# echo "JOB_status: " ${JOB_status//\"}

#echo -e "【$JOB_NAME】 \n JOB_ID: $JOB_ID \n 启动时间: `date -d @$START +"%F %H:%M:%S"` \n 检查时间: `date "+%Y-%m-%d %H:%M:%S"` \n 目前状态: $JOB_status"

#echo "=============================================="

if [ ${JOB_status//\"} != "RUNNING" ];then

SUBJECT="【异常告警】Flink任务异常"

TEXT="Flink任务 【$JOB_NAME】 异常故障 \n\nJOB_ID: $JOB_ID\n\n启动时间: `date -d @$START +"%F %H:%M:%S"` \n\n检查时间: `date "+%Y-%m-%d %H:%M:%S"` \n\n目前状态: $JOB_status"

echo -e $TEXT | mail -s $SUBJECT 邮箱地址

start_count=erron

fi

done

########### 4 出现任务异常,重新读取job 任务记录到日志文件 ###############

let i=0

if [ $start_count == "erron" ];then

echo '重新写入日志文件'

while :

do

job_id[$i]=`curl -s http://XXXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`

if [ ${job_id[$i]} = "null" ];then

break

elif [ $i == 0 ]; then

echo ${job_id[$i]}>/opt/shell/logs/flink_job.log

else

echo ${job_id[$i]}>>/opt/shell/logs/flink_job.log

fi

let i++

done

start_count=RUNNING

fi

########### 5 判断线上任务数是否一致,是否有新任务增加 ###############

let i=0

while :

do

job_id[$i]=`curl -s http://XXX:8088/ws/v1/cluster/apps?state=RUNNING |jq .apps.app[$i].id`

if [ ${job_id[$i]} = "null" ];then

break

else

let i++

fi

done

let count=$i #线上任务数

echo "==========================线上最新RUNNING状态任务数: "$count

echo "==========================日志RUNNING状态任务数: "$log_count

if [ ! $count -eq $log_count ]; then

echo "现有RUNNING状态任务数不相等于已记录的任务数"

echo ${job_id[0]} >/opt/shell/logs/flink_job.log

for ((i=1;i

do

echo "重新写入JOB: "${job_id[$i]}

echo ${job_id[$i]}>> /opt/shell/logs/flink_job.log

done

fi

echo "======================当前时间: `date "+%Y-%m-%d %H:%M:%S"`======================================="

echo ================================================================================================

echo =====================================本次crontab监控结束========================================

echo ================================================================================================

Yarn REST API 使用指南-阿里云开发者社区

Apache Hadoop 3.0.1 – ResourceManager REST API。

好文阅读

评论可见,请评论后查看内容,谢谢!!!评论后请刷新页面。