原理

触发器监控工作流实例表,当工作流实例表中的状态更新后,针对状态为失败的任务进行企业微信告警。

发送企业微信消息函数

su - postgres
# 必须在pg的主机上线安装requests模块
pip install requests
# 以postgres用户登陆psql客户端到etl数据库
psql etl -U postgres
# 创建插件plpython3u
create extension plpython3u;
# plpython3u为不受信语言,所以只能被超级用户使用
# 在tool模式下建立发送企业微信消息函数tool.sp_send_wechat
CREATE OR REPLACE FUNCTION tool.sp_send_wechat(message json, webhook character varying DEFAULT 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你自己的key'::character varying)
 RETURNS text
 LANGUAGE plpython3u
 SECURITY DEFINER
AS $function$
import requests
import json
"""
/*
 * 作者 : v-yuzhenc
 * 功能 : 给企业微信发送一条消息
 * message : 需要发送的消息,json格式
 * webhook : 企业微信机器人的webhook
 * */
"""
import requests
import json

# 企业微信自定义机器人的webhook地址
p_webhook = webhook
# 要发送的消息内容
p_message = json.loads(message)
# 发送POST请求
response = requests.post(p_webhook, data=json.dumps(p_message), headers={"Content-Type": "application/json"})

# 打印响应结果
return response.text
$function$
;
--将函数直接转给tool
ALTER FUNCTION tool.sp_send_wechat(json, varchar) OWNER TO tool;
--公开函数的执行权限
GRANT ALL ON FUNCTION tool.sp_send_wechat(json, varchar) TO public;
--将函数的执行权限授权给tool用户
GRANT ALL ON FUNCTION tool.sp_send_wechat(json, varchar) TO tool;
\q

远程执行命令函数

  • 由于海豚调度的任务日志是以文件的形式存储在操作系统中,所以,必须在数据库中实现这样一个函数,能够读取海豚服务器的日志文件
su - postgres
# 必须在pg的主机上安装paramiko模块
pip install paramiko
# 以postgres用户登陆psql客户端到etl数据库
psql etl -U postgres
# 上面已经创建了plpython3u插件,这里不需要再次建立了
# 创建远程执行命令函数tool.sp_remote_exec_command_nopass
CREATE OR REPLACE FUNCTION tool.sp_remote_exec_command_nopass(remote_command text, remote_host character varying DEFAULT 'dpmaster'::character varying, remote_port integer DEFAULT 22222, remote_username character varying DEFAULT 'dp'::character varying, remote_return_mode character varying DEFAULT 'stdout'::character varying)
 RETURNS text
 LANGUAGE plpython3u
 SECURITY DEFINER
AS $function$
import paramiko
"""
/*
 * 作者 : v-yuzhenc
 * 功能 : 免密(需要配置ssh免密)在远程服务器执行一条命令
 * remote_command : 需要执行的命令
 * remote_host : 远程主机名或ip
 * remote_port : ssh端口
 * remote_username : 免密登陆的用户
 * remote_return_mode : 返回信息的模式,stderr返回标准错误信息,否则返回标准输出
 * */
"""
# SSH连接信息
host = remote_host
port = remote_port
username = remote_username
private_key_path = '/home/postgres/.ssh/id_rsa'
ssh_command = remote_command

# 连接SSH服务器
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
private_key = paramiko.RSAKey.from_private_key_file(private_key_path)
ssh.connect(host, port, username, pkey=private_key)

# 通过SSH执行命令
stdin, stdout, stderr = ssh.exec_command(ssh_command)
p_stdout = stdout.read().decode().strip()
p_stderr = stderr.read().decode().strip()
# 关闭SSH连接
ssh.close()
# 打印响应结果
if remote_return_mode == 'stderr':
    return None if p_stderr == '' else p_stderr
else:
    return None if p_stdout == '' else p_stdout
$function$
;
-- Permissions
ALTER FUNCTION tool.sp_remote_exec_command_nopass(text, varchar, int4, varchar, varchar) OWNER TO tool;
GRANT ALL ON FUNCTION tool.sp_remote_exec_command_nopass(text, varchar, int4, varchar, varchar) TO tool;
GRANT ALL ON FUNCTION tool.sp_remote_exec_command_nopass(text, varchar, int4, varchar, varchar) TO dp;
\q

企业微信告警触发器

  • 由于企业微信markdown格式的消息艾特指定的人只能通过企业微信中的userid(即用户在企业微信中的账号)调用,所以,我们在海豚调度的元数据表t_ds_user中增加wechat_userid字段,人工将海豚的用户对应的企业微信的userid维护上去
# 以dp用户登录etl数据库
psql etl -U dp
# 增加字段
alter table t_ds_user add wechat_userid varchar(100);
comment on column t_ds_user.wechat_userid is '对应的企业微信的userid';
# 维护wechat_userid中的数据
# 这里根据自己的企业实际情况做
update t_ds_user 
set wechat_userid = 'YuZhenChao'
where user_name = 'yuzhenchao'
;
CREATE OR REPLACE FUNCTION dp.tg_ds_udef_alert_wechat()
 RETURNS trigger
 LANGUAGE plpgsql
AS $function$
/*
 * 作者:v-yuzhenc
 * 功能:海豚调度工作流失败自动告警
 * */
declare
    i record;
    v_content text;
    v_message varchar;
begin
    if new.state in (4,5,6) then 
        for i in (
            select
			     '<@'||d.wechat_userid||'>\r\n# [DolphinScheduler Job ]\r\n> 实例  id  :  ['||a.id::varchar||'/'||b.id||'](https://dolphin.tclpv.com/dolphinscheduler/ui/projects/'||g.code||'/workflow/instances/'||a.id||'?code='||a.process_definition_code||')\r\n> 项目名称 : <font color=\"comment\">'||g.name||'('||g.code||')</font>'||'\r\n> 工作流名 : <font color=\"comment\">'||e.name||'('||a.process_definition_code||')</font>'||'\r\n> 任务名称 : <font color=\"comment\">'||b.name||'('||b.task_code||')</font>'||'\r\n> 任务类型 : <font color=\"comment\">'||b.task_type||'</font>\r\n> 开始时间 : <font color=\"comment\">'||to_char(b.start_time,'yyyy-mm-dd hh24:mi:ss')||'</font>\r\n> 结束时间 : <font color=\"comment\">'||to_char(b.end_time,'yyyy-mm-dd hh24:mi:ss')||'</font>\r\n> 所属用户 : <font color=\"comment\">'||d.user_name||'('||c.user_id||')</font>\r\n> 任务状态 : <font color=\"warning\">执行失败</font>'||'\r\n> 报错信息 : <font color=\"warning\">'||tool.sp_remote_exec_command_nopass($remote_command$cat $remote_command$||b.log_path||$remote_command$ | grep "\[ERROR\]\|等表超时" | awk -F' - ' '{print $2!=null ? $2 : $1}' | head -1 | sed 's/\"/\\\"/g'$remote_command$,split_part(b.host,':',1))||'</font>' as wechat_content
			from t_ds_process_instance a 
			inner join t_ds_task_instance b 
			on (a.id = b.process_instance_id)
			inner join t_ds_task_definition c 
			on (b.task_code = c.code and b.task_definition_version = c."version")
			inner join t_ds_user d 
			on (c.user_id = d.id)
			inner join t_ds_process_definition e 
			on (a.process_definition_code = e.code and a.process_definition_version = e."version")
			inner join t_ds_project g 
			on (e.project_code = g.code)
			where c.task_type <> 'SUB_PROCESS'
			    and a.state = 6
			    and b.state = 6
			    and a.id = new.id
        ) loop 
            v_content := i.wechat_content;
            v_message := $v_message${
    "msgtype":"markdown",
    "markdown": {
        "content":"$v_message$||v_content||$v_message$"
    }
}$v_message$;
            --告警
            perform tool.sp_send_wechat(v_message::json);
        end loop;
    end if;
    return new;
    exception when others then 
        return new;
end;
$function$
;
-- Permissions
ALTER FUNCTION dp.tg_ds_udef_alert_wechat() OWNER TO dp;
GRANT ALL ON FUNCTION dp.tg_ds_udef_alert_wechat() TO dp;
# 创建时候触发器
create trigger tg_state_ds_process_instance after update on dp.t_ds_process_instance for each row execute function dp.tg_ds_udef_alert_wechat();
\q

测试

  • 新建一个工作流,选择SQL组件

触发器实现海豚调度失败企业微信自动告警-LMLPHP

  • 保存工作流
    触发器实现海豚调度失败企业微信自动告警-LMLPHP

  • 上线工作流并运行工作流
    触发器实现海豚调度失败企业微信自动告警-LMLPHP

  • 工作流运行失败
    触发器实现海豚调度失败企业微信自动告警-LMLPHP

  • 随即企业微信来了消息提醒
    触发器实现海豚调度失败企业微信自动告警-LMLPHP
    触发器实现海豚调度失败企业微信自动告警-LMLPHP

08-05 00:48