#mysql导入数据
--全量替换数据
load data local infile "/home/python/data/stock.csv" replace into table stock CHARACTER SET gb2312 FIELDS TERMINATED BY "," LINES TERMINATED BY "\n";
--新增数据
load data local infile '/etl/etldata/input/20170216/industry_20170216.csv' into table ext_stock_industry fields terminated by "," lines terminated by '\n';
#将ext外表指向相应的目录
"ALTER TABLE ext_${tab_name} SET LOCATION 'hdfs://nameservice1/etldata/input/${tab_name}'"
## 清空STG增量表 ###
echo "`dt` :start truncate table STG_${tab_name}"
${TDH} "truncate table STG_${tab_name}"
[ $? -ne 0 ] && echo "`dt` :truncate table STG_${tab_name} failed " && exit -1
echo "`dt` :truncate Table STG_${tab_name} succeed "
## 将每日增量数据装入STG表 ###
echo "`dt` :start insert STG_${tab_name} from EXT_${tab_name}"
${TDH} "insert into STG_${tab_name} select A.*,${today} from EXT_${tab_name} A"
[ $? -ne 0 ] && echo "`dt` :insert STG_${tab_name} failed " && exit -1
echo "`dt` :insert into Table STG_${tab_name} succeed "
return 0
#清空调度环境
1.进入清空etl调度表脚本目录
cd /home/dainst/tmp/jjm/init
2.执行rollback.sh脚本清空环境
./rollback.sh
#配置作业流
1.配置db2驱动
在本机运行:
C:\Windows\SysWOW64\odbcad32.exe
数据源名称:dydwmm
用户标识:dainst
密码:dainst
IP:192.168.101.90
端口号:61000
2.打开excel提交作业流
#在本机打开excel文件
C:\Users\Idea\Desktop\数据项目\05-大数据平台\02-数据处理\ETL\大数据平台部署\06-相关文档
#删除右侧文件
分别提交3个sheet页
3.检查作业配置是否成功
#在waterdrop中执行如下操作检查:
SELECT * FROM etl.job_loc;
SELECT * FROM etl.job_metadata;
SELECT * FROM etl.job_seq;
#查看跑批日期
SELECT * FROM etl.job_biz_date;
4.清空历史目录
#清空省联社下发文件目录
rm -rf /etl/etldata/input/S-999000/CCRD/ADD/*
rm -rf /etl/etldata/input/S-999000/CORE/ADD/*
rm -rf /etl/etldata/input/S-999000/FMS/ADD/*
rm -rf /etl/etldata/input/S-999000/FMS/ADD/*
#清空dw数据库导出数据目录
rm -rf /etl/etldata/input/DW_TO_HIVE/*
5.开启调度
cd /etl/etldata/script/job-schd-engine-0.1Silver/bin
#先停止调度
mv nohup.out nohup.out_tmp
#配置调度启动时间
cd /etl/etldata/script/job-schd-engine-0.1Silver/config
vi quartz_jobs.xml
#开启调度
cd /etl/etldata/script/job-schd-engine-0.1Silver/bin
nohup start.sh &
6.前台查看调度页面
推送省联社下发文件
cd /home/dainst/tmp/jjm/Note/CORE
./tools.sh 20170221 FMS
./tools.sh 20170221 IMBS
./tools.sh 20170221 CCRD
./tools.sh 20170221 CORE
DW库中握手文件
cd /etl/etldata/input/DW_TO_HIVE/20170220
touch HANDFILE.OK
7.第一个批成功跑完
修改视图
drop view ETL.JOB_BIZ_DATE;
CREATE VIEW JOB_BIZ_DATE(
CUR_DT,
LAST_DT,
PPN_TSTMP
) AS SELECT
'2017-02-22' AS CUR_DT,
'2016-02-21' AS LAST_DT,
CURRENT TIMESTAMP AS PPN_TSTMP
FROM
CORE.BGFMCINF
WHERE
ETL_FLAG = 'I'
select * from ETL.JOB_BIZ_DATE
修改时间配置,实例化第二批次
cd /etl/etldata/script/job-schd-engine-0.1Silver/config
vi quartz_jobs.xml
创建压缩文件,并推送到对应目录上
cd /home/dainst/tmp/jjm/Note/CORE
./tools.sh 20170221 FMS
./tools.sh 20170221 IMBS
./tools.sh 20170221 CCRD
./tools.sh 20170221 CORE
查看前台调度页面
省联社文件解压成功,后续作业成功执行
1.在前台页面修改FILE_CHECK_CRM作业名称为FILE_CHECK_DW
调度管理-作业管理-作业场景选择FILE-查询
选中FILE_CHECK_CRM作业,点击编辑按钮
将FILE_CHECK_CRM作业的作业名称和互斥组改为FILE_CHECK_DW
select * from ETL.JOB_METADATA WHERE job_tp = 'FILE';
FILE_CHECK_CRM已经消失,变为FILE_CHECK_DW
select * from ETL.JOB_log WHERE job_nm LIKE 'FILE%';
4839948 WAITING FILE_CHECK_CRM 2017-02-21
2.给DW抽数据握手文件,观察调度能否继续执行
touch /etl/etldata/input/DW_TO_HIVE/20170221/HANDFILE.OK
如果实例化批之后,修改了作业名称,作业不能继续往下执行
在实例化作业之前,修改作业名称查看作业能否执行
该作业能继续执行,但依赖于该作业的后续作业也会执行
#一个作业依赖于一个不存在的作业,看看该作业能否执行???
CMD_CHK_COMMUNICATE_FILE作业依赖于FILE_CHECK_CRM,在未实例化批之前,
在前台将FILE_CHECK_CRM改名为FILE_CHECK_DW。
FILE_CHECK_DW能继续执行
CMD_CHK_COMMUNICATE_FILE也会继续执行。
#前台删除作业
修改视图
drop view ETL.JOB_BIZ_DATE;
CREATE VIEW JOB_BIZ_DATE(
CUR_DT,
LAST_DT,
PPN_TSTMP
) AS SELECT
'2017-02-20' AS CUR_DT,
'2016-02-19' AS LAST_DT,
CURRENT TIMESTAMP AS PPN_TSTMP
FROM
CORE.BGFMCINF
WHERE
ETL_FLAG = 'I'
select * from ETL.JOB_BIZ_DATE;
修改时间配置,实例化第二批次
cd /etl/etldata/script/job-schd-engine-0.1Silver/config
vi quartz_jobs.xml
创建压缩文件,并推送到对应目录上
cd /home/dainst/tmp/jjm/Note/CORE
./tools.sh 20170220 FMS
./tools.sh 20170220 IMBS
./tools.sh 20170220 CCRD
./tools.sh 20170220 CORE
4.
SELECT * FROM etl.job_seq WHERE PRE_JOB='CMD_CHK_COMMUNICATE_FILE';
update etl.job_seq set PRE_JOB='FILE_CHECK_DW' where PRE_JOB='CMD_CHK_COMMUNICATE_FILE';
SELECT * FROM etl.job_seq WHERE PRE_JOB='FILE_CHECK_DW';
-------------------------------------------
233数据库中:
SELECT count(*) FROM S_CRM_CUST_PE_BASE_INFO; --1829596条
90数据库中
SELECT count(*) FROM ext_s_crm_cust_pe_base_info; --1829596条
SELECT count(*) FROM stg_s_crm_cust_pe_base_info; --1829596条
SELECT count(*) FROM sdi_s_crm_cust_pe_base_info; --2005970条
SELECT count(*) FROM his_s_crm_cust_pe_base_info; --0条
重做作业:
再次查询数据库信息:
#备份bdpdb中数据库的s_crm_cust_pe_base_info表
drop table if exists sdi_s_crm_cust_pe_base_info_20170224;
CREATE table sdi_s_crm_cust_pe_base_info_20170224 AS SELECT * FROM sdi_s_crm_cust_pe_base_info;
select count(*) from sdi_s_crm_cust_pe_base_info_20170224;
drop table if exists stg_s_crm_cust_pe_base_info_20170224;
CREATE table stg_s_crm_cust_pe_base_info_20170224 AS SELECT * FROM stg_s_crm_cust_pe_base_info;
select count(*) from stg_s_crm_cust_pe_base_info_20170224;
drop table if exists ext_s_crm_cust_pe_base_info_20170224;
CREATE table ext_s_crm_cust_pe_base_info_20170224 AS SELECT * FROM ext_s_crm_cust_pe_base_info;
drop table if exists his_s_crm_cust_pe_base_info_20170224;
CREATE table his_s_crm_cust_pe_base_info_20170224 AS SELECT * FROM his_s_crm_cust_pe_base_info;
select count(*) from his_s_crm_cust_pe_base_info_20170224;
#创建新表
DROP TABLE SDI_S_CRM_CUST_PE_BASE_INFO;
CREATE TABLE IF NOT EXISTS SDI_S_CRM_CUST_PE_BASE_INFO (
CUST_NO VARCHAR(32) comment "CRM客户号",
CERT_TYPE VARCHAR(8) comment "证件类型(清洗后的)",
CERT_NO VARCHAR(20) comment "证件号(清洗后的)",
PECUST_NAME VARCHAR(80) comment "客户名称",
CUSTNAME_SHORT VARCHAR(40) comment "客户简称",
CUST_NAME_EN VARCHAR(100) comment "客户名称(英文)",
VIP_CATE VARCHAR(8) comment "贵宾类别",
COUNTRY VARCHAR(8) comment "国家",
CREDIT_LEVEL VARCHAR(8) comment "信用等级",
IS_FREETAX VARCHAR(8) comment "免税标识",
BTFLBOOL VARCHAR(8) comment "移行客户标志",
DIMABOOL VARCHAR(8) comment "是否接收推广信息",
SEX VARCHAR(8) comment "性别",
NTY VARCHAR(8) comment "民族",
MRG VARCHAR(8) comment "婚姻状况",
BTD VARCHAR(10) comment "出生日期",
STUDY_EXP VARCHAR(8) comment "最高学历",
DEGREE VARCHAR(8) comment "最高学位",
PAY_ACCT VARCHAR(32) comment "工资账号",
ACCT_WT_BK VARCHAR(80) comment "开户银行",
YEAR_INCOME DECIMAL(16,2) comment "年收入",
FMY_PPL INTEGER comment "家庭人数",
INHBT_STAT VARCHAR(8) comment "居住状况",
CUST_LEV VARCHAR(8) comment "客户层次",
BANK_EMP_IND VARCHAR(8) comment "本行员工标志",
EMPLOYEE_TYP VARCHAR(8) comment "员工类别",
BANK_STK_HOLDER_IND VARCHAR(8) comment "是否本行股东",
BANK_PARTY_IND VARCHAR(8) comment "是否本行关系人",
NOTES VARCHAR(200) comment "本行关系人备注",
XDZX_TYPE VARCHAR(8) comment "客户信贷类型",
CUST_QLY VARCHAR(8) comment "客户性质",
PERSONAL_INSRC VARCHAR(8) comment "人身保险",
INSURANCE_DT VARCHAR(10) comment "保险期限",
BAD_RECORD DECIMAL(5,0) comment "不良记录笔数",
IS_LTC_CREDIT_CUST VARCHAR(8) comment "是否潜在信贷客户",
NATIVE VARCHAR(200) comment "籍贯",
EFF_DATE VARCHAR(8) comment "批量日期",
END_DATE VARCHAR(8) comment "数据失效日期",
JOB_SEQ_ID VARCHAR(8) comment "批次号"
) comment "对私客户基本信息"
CLUSTERED BY (CUST_NO) INTO 11 BUCKETS
STORED AS ORC
TBLPROPERTIES ("transactional"="true");
DROP TABLE STG_S_CRM_CUST_PE_BASE_INFO;
CREATE TABLE IF NOT EXISTS STG_S_CRM_CUST_PE_BASE_INFO (
CUST_NO string comment "CRM客户号",
CERT_TYPE string comment "证件类型(清洗后的)",
CERT_NO string comment "证件号(清洗后的)",
PECUST_NAME string comment "客户名称",
CUSTNAME_SHORT string comment "客户简称",
CUST_NAME_EN string comment "客户名称(英文)",
VIP_CATE string comment "贵宾类别",
COUNTRY string comment "国家",
CREDIT_LEVEL string comment "信用等级",
IS_FREETAX string comment "免税标识",
BTFLBOOL string comment "移行客户标志",
DIMABOOL string comment "是否接收推广信息",
SEX string comment "性别",
NTY string comment "民族",
MRG string comment "婚姻状况",
BTD string comment "出生日期",
STUDY_EXP string comment "最高学历",
DEGREE string comment "最高学位",
PAY_ACCT string comment "工资账号",
ACCT_WT_BK string comment "开户银行",
YEAR_INCOME string comment "年收入",
FMY_PPL string comment "家庭人数",
INHBT_STAT string comment "居住状况",
CUST_LEV string comment "客户层次",
BANK_EMP_IND string comment "本行员工标志",
EMPLOYEE_TYP string comment "员工类别",
BANK_STK_HOLDER_IND string comment "是否本行股东",
BANK_PARTY_IND string comment "是否本行关系人",
NOTES string comment "本行关系人备注",
XDZX_TYPE string comment "客户信贷类型",
CUST_QLY string comment "客户性质",
PERSONAL_INSRC string comment "人身保险",
INSURANCE_DT string comment "保险期限",
BAD_RECORD string comment "不良记录笔数",
IS_LTC_CREDIT_CUST string comment "是否潜在信贷客户",
ETL_BIZ_DT string comment "ETL插入日期",
ETL_LOAD_DT string comment "ETL更新日期",
NATIVE string comment "籍贯",
ETL_EFF_DATE VARCHAR(8) comment "批量日期"
) comment "对私客户基本信息"
CLUSTERED BY (CUST_NO) INTO 11 BUCKETS
STORED AS ORC
TBLPROPERTIES ("transactional"="true");
DROP TABLE HIS_S_CRM_CUST_PE_BASE_INFO;
CREATE TABLE IF NOT EXISTS HIS_S_CRM_CUST_PE_BASE_INFO (
CUST_NO VARCHAR(32) comment "CRM客户号",
CERT_TYPE VARCHAR(8) comment "证件类型(清洗后的)",
CERT_NO VARCHAR(20) comment "证件号(清洗后的)",
PECUST_NAME VARCHAR(80) comment "客户名称",
CUSTNAME_SHORT VARCHAR(40) comment "客户简称",
CUST_NAME_EN VARCHAR(100) comment "客户名称(英文)",
VIP_CATE VARCHAR(8) comment "贵宾类别",
COUNTRY VARCHAR(8) comment "国家",
CREDIT_LEVEL VARCHAR(8) comment "信用等级",
IS_FREETAX VARCHAR(8) comment "免税标识",
BTFLBOOL VARCHAR(8) comment "移行客户标志",
DIMABOOL VARCHAR(8) comment "是否接收推广信息",
SEX VARCHAR(8) comment "性别",
NTY VARCHAR(8) comment "民族",
MRG VARCHAR(8) comment "婚姻状况",
BTD VARCHAR(10) comment "出生日期",
STUDY_EXP VARCHAR(8) comment "最高学历",
DEGREE VARCHAR(8) comment "最高学位",
PAY_ACCT VARCHAR(32) comment "工资账号",
ACCT_WT_BK VARCHAR(80) comment "开户银行",
YEAR_INCOME DECIMAL(16,2) comment "年收入",
FMY_PPL INTEGER comment "家庭人数",
INHBT_STAT VARCHAR(8) comment "居住状况",
CUST_LEV VARCHAR(8) comment "客户层次",
BANK_EMP_IND VARCHAR(8) comment "本行员工标志",
EMPLOYEE_TYP VARCHAR(8) comment "员工类别",
BANK_STK_HOLDER_IND VARCHAR(8) comment "是否本行股东",
BANK_PARTY_IND VARCHAR(8) comment "是否本行关系人",
NOTES VARCHAR(200) comment "本行关系人备注",
XDZX_TYPE VARCHAR(8) comment "客户信贷类型",
CUST_QLY VARCHAR(8) comment "客户性质",
PERSONAL_INSRC VARCHAR(8) comment "人身保险",
INSURANCE_DT VARCHAR(10) comment "保险期限",
BAD_RECORD DECIMAL(5,0) comment "不良记录笔数",
IS_LTC_CREDIT_CUST VARCHAR(8) comment "是否潜在信贷客户",
NATIVE VARCHAR(200) comment "籍贯",
EFF_DATE VARCHAR(8) comment "批量日期",
END_DATE VARCHAR(8) comment "数据失效日期",
JOB_SEQ_ID VARCHAR(8) comment "批次号"
,NEW_JOB_SEQ_ID VARCHAR(8) comment "历史批次号"
) comment "对私客户基本信息"
CLUSTERED BY (CUST_NO) INTO 11 BUCKETS
STORED AS ORC
TBLPROPERTIES ("transactional"="true");
DROP TABLE EXT_S_CRM_CUST_PE_BASE_INFO;
CREATE EXTERNAL TABLE IF NOT EXISTS EXT_S_CRM_CUST_PE_BASE_INFO (
CUST_NO string comment "CRM客户号",
CERT_TYPE string comment "证件类型(清洗后的)",
CERT_NO string comment "证件号(清洗后的)",
PECUST_NAME string comment "客户名称",
CUSTNAME_SHORT string comment "客户简称",
CUST_NAME_EN string comment "客户名称(英文)",
VIP_CATE string comment "贵宾类别",
COUNTRY string comment "国家",
CREDIT_LEVEL string comment "信用等级",
IS_FREETAX string comment "免税标识",
BTFLBOOL string comment "移行客户标志",
DIMABOOL string comment "是否接收推广信息",
SEX string comment "性别",
NTY string comment "民族",
MRG string comment "婚姻状况",
BTD string comment "出生日期",
STUDY_EXP string comment "最高学历",
DEGREE string comment "最高学位",
PAY_ACCT string comment "工资账号",
ACCT_WT_BK string comment "开户银行",
YEAR_INCOME string comment "年收入",
FMY_PPL string comment "家庭人数",
INHBT_STAT string comment "居住状况",
CUST_LEV string comment "客户层次",
BANK_EMP_IND string comment "本行员工标志",
EMPLOYEE_TYP string comment "员工类别",
BANK_STK_HOLDER_IND string comment "是否本行股东",
BANK_PARTY_IND string comment "是否本行关系人",
NOTES string comment "本行关系人备注",
XDZX_TYPE string comment "客户信贷类型",
CUST_QLY string comment "客户性质",
PERSONAL_INSRC string comment "人身保险",
INSURANCE_DT string comment "保险期限",
BAD_RECORD string comment "不良记录笔数",
IS_LTC_CREDIT_CUST string comment "是否潜在信贷客户",
ETL_BIZ_DT string comment "ETL插入日期",
ETL_LOAD_DT string comment "ETL更新日期",
NATIVE string comment "籍贯"
) comment "对私客户基本信息"
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.csv.serde.CSVSerde'
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION '/etldata';
跑20170220批数据,查询数据量
#查看新表数据量
SELECT count(*) FROM ext_s_crm_cust_pe_base_info; --1829596
SELECT count(*) FROM stg_s_crm_cust_pe_base_info; --1829696
SELECT count(*) FROM sdi_s_crm_cust_pe_base_info; --1829596
SELECT count(*) FROM his_s_crm_cust_pe_base_info; --0
重跑20170220批作业,所有作业成功跑批,再次查看数据量
SELECT count(*) FROM ext_s_crm_cust_pe_base_info; --1829596
SELECT count(*) FROM stg_s_crm_cust_pe_base_info; --1829696
SELECT count(*) FROM sdi_s_crm_cust_pe_base_info; --1829596
SELECT count(*) FROM his_s_crm_cust_pe_base_info; --0
create or replace procedure bdpdb.sp_idv_cst_smy_bk_tl_busi()
IS
trc_tab STRING
BEGIN
trc_tab:="truncate table bdpdb.IDV_CST_SMY_BK_TL_BUSI ;";
EXECUTE IMMEDIATE trc_tab;
insert into bdpdb.IDV_CST_SMY_BK_TL_BUSI
select date_dt --日期
,branch_id --机构号
,counter_no --柜员号
,tel_txid --交易源代码
,NVL(LAG(tran_time,1) OVER(partition by date_dt,counter_no,branch_id,process_time,process_end_time
order by date_dt,branch_id,counter_no,process_time,process_end_time,tran_time),process_time) start_time --交易起始时间
,tran_time --交易结束时间
from(
select A.date_dt --日期
,A.counter_no --柜员号
,A.branch_id --机构号
,B.tel_txid --交易源代码
,A.process_time --叫号起始时间
,B.tran_time --交易时间
,A.process_end_time --叫号结束时间
from
(
select
substr(A.process_time,1,4)||substr(A.process_time,6,2)||substr(A.process_time,9,2) date_dt
,A.BRANCH_ID
,A.WIN_NO
,B.counter_no
,TDH_TODATE(TIMESTAMP(A.PROCESS_TIME) + interval '' SECOND,'yyyy-MM-dd HH:mm:ss','yyyyMMddHHmmss') PROCESS_TIME --叫号时间
,TDH_TODATE(TIMESTAMP(A.process_end_time) + interval '' SECOND,'yyyy-MM-dd HH:mm:ss','yyyyMMddHHmmss') process_end_time--业务办理结束时间
from bdpdb.SDI_T_CUST_QUEUE A
left join ( --去除一个柜员多个窗口情况
SELECT A.date_dt,A.department BRANCH_ID ,B.win_no,A.counter_no
FROM (
select date_dt
,department
,counter_no
from bdpdb.SDI_BH_COUNTER
GROUP BY date_dt,department,counter_no
having count(win_no)=1
) A
LEFT JOIN bdpdb.SDI_BH_COUNTER B
ON A.date_dt=B.date_dt AND A.department=B.department
AND A.counter_no=B.counter_no
order BY 1,2,3,4
) B
on substr(A.process_time,1,4)||substr(A.process_time,6,2)||substr(A.process_time,9,2)=B.date_dt
and A.BRANCH_ID=B.BRANCH_ID and A.WIN_NO=B.WIN_NO
where length(B.counter_no)=7
order by 1,2,3,4,5,6
) A
right join (
select distinct SYS_DATE,TEL_ID,TEL_TXID,TRAN_TIME
from ( select
SYS_DATE --交易日期
,TEL_ID --交易柜员号
,TEL_TXID --柜员输入原交易码
,SYS_DATE||SYS_TIME TRAN_TIME--交易时间
from bdpdb.SDI_F_CORE_BYFTJRN
where tx_ouno like '89120%' and substr(TEL_ID,1,3)=''
AND SYS_DATE>='' and SYS_DATE<=''
order by 1,2,4,3)
)
B
on A.date_dt=B.sys_date and A.counter_no=B.tel_id
and B.tran_time >= A.process_time
and B.tran_time <= A.process_end_time
where A.date_dt is not NULL
order by 1,2,3,5,7,6,4
)
END