--登陆INFA资料库,运行下面的SQL

--想要更加个性化查询的话注意看SQL倒数第二第三行的备注

SELECT
RUN_DATE,
START_TIME ,
END_TIME,
FOLIDER ,
WORKFLOW,
WORKLET_LVL3,
WORKLET_LVL2,
WORKLET_LVL1,
SESSION_NAME,
MAPPING_NAME,
RUN_STATUS,
SUSSESSFUL_SOURC_ROWS,
FAILED_SOURCE_ROWS,
SUCCCESSFUL_ROWS,
FAILED_ROWS,
RUN_ERR_CODE,
RUN_ERR_MSG,
FIRST_ERROR_CODE,
FIRST_ERROR_MSG,
LAST_ERROR_CODE,
LAST_ERROR,
SESSION_LOG_FILE,
BAD_FILE_LOCATION
FROM (SELECT
TRUNC(T1.START_TIME) AS RUN_DATE, --跑数日期
T1.START_TIME AS START_TIME, --开始时间
T1.END_TIME AS END_TIME, --结束时间
T.SUBJECT_AREA AS FOLIDER, --FOLDER
T.WORKFLOW_NAME AS WORKFLOW, --WORKFLOW
T4.INSTANCE_NAME AS WORKLET_LVL3, --大爷级WORKLET
T3.INSTANCE_NAME AS WORKLET_LVL2, --爷爷级WORKLET
T2.INSTANCE_NAME AS WORKLET_LVL1, --父级WORKLET
T1.INSTANCE_NAME AS SESSION_NAME, --SESSION
decode(T1.RUN_STATUS_CODE,
1,'Succeeded',
2,'Disabled',
3,'Failed',
4,'Stopped',
5,'Aborted',
6,'Running',
7,'Suspending',
8,'Suspended',
9,'Stopping',
10,'Aborting',
11,'Waiting',
12,'Scheduled',
13,'Unscheduled',
14,'Unknown',
15,'Terminated'
) as RUN_STATUS, --运行状态
T1.RUN_ERR_CODE RUN_ERR_CODE, --错误代码
T1.RUN_ERR_MSG RUN_ERR_MSG, --错误信息
T.FIRST_ERROR_CODE, --第一次发生错误的代码
T.FIRST_ERROR_MSG, --第一次发生错误的信息
T.LAST_ERROR_CODE, --最后发生错误的代码
T.LAST_ERROR, --最后发生错误的信息
T.SESSION_LOG_FILE, --SESSION LOG路径
T.BAD_FILE_LOCATION, --BAD_FILE路径
T.MAPPING_NAME MAPPING_NAME, --MAPPING
T.SUCCESSFUL_SOURCE_ROWS SUSSESSFUL_SOURC_ROWS, --读取源成功条数
T.FAILED_SOURCE_ROWS FAILED_SOURCE_ROWS, --读取源失败条数
T.SUCCESSFUL_ROWS SUCCCESSFUL_ROWS, --插入目标成功条数
T.FAILED_ROWS FAILED_ROWS --插入目标失败条数
FROM REP_SESS_LOG T
INNER JOIN REP_TASK_INST_RUN T1 --返回session状态信息
ON T.SUBJECT_ID = T1.SUBJECT_ID
AND T.WORKFLOW_ID = T1.WORKFLOW_ID
AND T.WORKFLOW_RUN_ID = T1.WORKFLOW_RUN_ID
AND T.WORKLET_RUN_ID = T1.WORKLET_RUN_ID
AND T.SESSION_ID = T1.TASK_ID
AND T.INSTANCE_ID = T1.INSTANCE_ID
AND T1.TASK_TYPE_NAME = 'Session' LEFT JOIN REP_TASK_INST_RUN T2 --返回父级WKL
ON T1.SUBJECT_ID = T2.SUBJECT_ID
AND T1.WORKFLOW_ID = T2.WORKFLOW_ID
AND T1.WORKFLOW_RUN_ID = T2.WORKFLOW_RUN_ID
AND T1.WORKLET_RUN_ID = T2.CHILD_RUN_ID
AND T2.TASK_TYPE_NAME = 'Worklet' LEFT JOIN REP_TASK_INST_RUN T3 --返回爷爷级WKL,到这一级其实就可以看到session是属于哪个电厂的了
ON T2.SUBJECT_ID = T3.SUBJECT_ID
AND T2.WORKFLOW_ID = T3.WORKFLOW_ID
AND T2.WORKFLOW_RUN_ID = T3.WORKFLOW_RUN_ID
AND T2.WORKLET_RUN_ID = T3.CHILD_RUN_ID
AND T3.TASK_TYPE_NAME = 'Worklet' LEFT JOIN REP_TASK_INST_RUN T4 --返回老大爷级WKL
ON T3.SUBJECT_ID = T4.SUBJECT_ID
AND T3.WORKFLOW_ID = T4.WORKFLOW_ID
AND T3.WORKFLOW_RUN_ID = T4.WORKFLOW_RUN_ID
AND T3.WORKLET_RUN_ID = T4.CHILD_RUN_ID
AND T4.TASK_TYPE_NAME = 'Worklet'
) WHERE
START_TIME > TRUNC(SYSDATE-1) ------筛选时间
AND RUN_STATUS = 'Failed' --session运行状态的筛选,若只想看失败的session信息,则令RUN_STATUS = 'Failed';否则去掉该条件,此时可以看到所有session运行状态 ORDER BY RUN_DATE DESC, WORKFLOW, WORKLET_LVL3, WORKLET_LVL2, WORKLET_LVL1, SESSION_NAME --排序以更好的形式展示出来

查询得到的结果示例如下:

I01-通过查询资料库方式来监控Informatica调度情况-LMLPHP

拿第一条举例,由RUN_STATUS这个字段内容(Failed)可以看出,ETL0_ODS这个folder下的WKF_ETL0这个workflow下的WKL_FMIS_GJXNY worklet下的WKL_T07 worklet 下的WKL_FMIS_APPLSYS worklet下的s_m_ods_fmis_applsys_fnd_flex_value_sets这个session调度出错了,出错原因是无法连接上数据库。

如果想让该SQL每天自动运行,然后将查数结果发送到邮箱的话,我的做法需要下面几个步骤:

(1)将该sql保存到Informatica服务器上的一个文件中(base_sql.txt),有可能需要去掉上面SQL里的中文注释。

(2)Informatica服务器上写一个shell脚本(workflow_dispatch.sh),脚本中调用sqluldr2命令来远程到Informatica资料库上执行base_sql.txt文件中的SQL语句,结果保存到一个文件中。命令如下:

 sqluldr2 user=${conn_str} sql=${py_file_dir}${sql_file} fast=yes field="," head=yes file=${result_dir}${result_file} escape='\' escf=0x0a esct=n

其中escape='\' escf=0x0a esct=n等参数设定是为了去掉sql执行结果中有的数据里面存在换行。

(3)执行完第二步之后,查数结果已经保存到某个文件中( sqluldr2命令中file参数指定的文件)了,接下来只需在shell中调用相应的邮件服务命令将结果文件发送到邮件即可。至于Informatica服务器上的邮件服务,使用msmtp+mutt或者写python脚本或者其他的都行,我采用的python脚本的方式(sendmail.py)。若是想使用msmtp+mutt的方式,可参考以下链接进行配置:http://www.cnblogs.com/suhaha/p/8655033.html

(4)虽然存放查数结果的文件占用空间不多,也应该在shell中定时删除太久之前的文件,比如每次都固定删除一个月之前的文件。

完整的workflow_dispatch.sh脚本示例如下:

#!/bin/bash

check_date=$(date +%Y%m%d_%H%M)

base_dir="/data/ODS_TO_EDW_DATACHECK/"
result_dir=${base_dir}"result_file/"
py_file_dir=${base_dir}"py_file/" #conn_str="repdb01/[email protected]:1521/test" #DEV environment
conn_str="repdb/[email protected]:1521/appdb" #PRD environment result_file="dispatch_result_"${check_date}".csv"
sql_file="base_sql.txt"
tar_file="dispatch_result_"${check_date}".tar.gz"
mail_file="sendmail.py" echo "execute sqluldr2..."
echo -e "\n" sqluldr2 user=${conn_str} sql=${py_file_dir}${sql_file} fast=yes field="," head=yes file=${result_dir}${result_file} escape='\' escf=0x0a esct=n if [ $? -eq ]; then
echo -e "\n"
echo "sqluldr2 run successful!"
echo -e "\n" echo "check the count of error session..."
cd ${result_dir}
cnt=`cat ${result_file} | wc -l`
cnt=`expr ${cnt} - `
echo "[cnt of error sessions ]: "${cnt}
echo -e "\n" #echo "compress the result file into : "${tar_file}
#tar -czvf ${tar_file} ${result_file} && rm -rf ${result_file} echo -e "\n"
echo "Send mail..."
if [ ${cnt} -gt ]; then
python ${py_file_dir}${mail_file} "Result of Informatica Dispatch (PRD)" \
"Infamatica Dispatch finish. "${cnt}" error sessions. Check the attachment for more details." ${result_dir}${result_file} $@
else
python ${py_file_dir}${mail_file} "Result of Informatica Dispatch (PRD)" "Infamatica Dispatch finish. No error." ${result_dir}${result_file} $@
fi else
echo "sqluldr2 run failed!"
exit
fi

完整的sendmail.py脚本示例如下:

#!/usr/bin/python
# -*- coding: UTF-8 -*- import sys
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header
from email.utils import formataddr """
该脚本调用格式: python /data/ODS_TO_EDW_DATACHECK/py_file/sendmail.py +theme +body +attachment +recipient1 +recipient2 + ... +recipients
调用示例:python sendmail.py "Result of DataCheck" "23333..This is body..." /data/ODS_TO_EDW_DATACHECK/py_file/sendmail.py [email protected] 备注:添加的附件必须是全路径的,比如:/data/ODS_TO_EDW_DATACHECK/py_file/sendmail.py
""" def sendmail( theme, body, attachment, recipients ): mail_host="smtp.163.com"
mail_user="[email protected]"
mail_pass="wysxxx9527" subject = theme
text = body
file = attachment
receivers = recipients
sender = "[email protected]" message = MIMEMultipart()
message["From"] = formataddr(["INFA SERVER", sender])
message["To"] = Header("ETL Engineer", "utf-8")
message["Subject"] = Header(subject, "utf-8")
message.attach(MIMEText(text, "plain", "utf-8"))
name = file[ file.rindex(r"/")+1 : len(file) ]
att = MIMEText(open(file, "r").read(), "base64", "utf-8")
att["Content-Type"] = "application/octet-stream"
att["Content-Disposition"] = 'attachment; filename=' + '"' + name + '"'
message.attach(att) try:
smtpObj = smtplib.SMTP()
smtpObj.connect(mail_host, 25)
smtpObj.login(mail_user,mail_pass)
smtpObj.sendmail(sender, receivers, message.as_string())
print "\nSend Mail Successfully!"
except smtplib.SMTPException:
print "\nError: Send Mail Failed!"
print str(smtplib.SMTPException) #*********************************************************************************************
if __name__ == '__main__': theme = sys.argv[1]
body = sys.argv[2]
attachment = sys.argv[3]
recipients = sys.argv[4: len(sys.argv)] print "theme: ", theme
print "body: ", body
print "attachment: ", attachment
print "recipients: ", recipients sendmail( theme, body, attachment, recipients )

(5)在Informatica中新建一个Task,Task中调用shell脚本。如下图。然后在INFA每日调度的最后一个session或者最后一个workflow末尾加上该Task即可。

I01-通过查询资料库方式来监控Informatica调度情况-LMLPHP

最后,当然INFA自身也能配置邮件服务,并且配置之后它的报错也能详细到session级别,如下图。说实话我没用过。

不过在项目的实施过程中,开发的session数量一般都是上几百个的,每个session都配置的话会比较麻烦;再者,假设真的每个session都配置了,然后每个session跑成功或者失败了都给你发一封邮件,一般INFA的调度还都在半夜,会疯掉的~

我之所采用上面描述的方法是因为:项目开发之初压根没考虑到调度的监控问题,都项目后期了前台数据对不上之后往前溯源了才发现是某个session挂掉好久了,因此才想起配置这么个东西。

另外,我觉得这种方式比INFA自身的邮件设置方式要相对好一点吧~INFA调度完成之后统一查询,速度快,效率也高,最重要的是每天只给你发一封邮件。。。

I01-通过查询资料库方式来监控Informatica调度情况-LMLPHP

05-11 19:28