使用到工具: 1. 存储 mongodb 2. 定时任务 crontab 数据存储空间( database = fs )说明: collection = functions(方法定义) dispatch(调度定义) joblist(待运行任务集) result(结果)0. 调度逻辑 : × 通过 定时任务 导入日志到 mongodb × 根据个性统计需求,写 分析脚本 ( functions ) - 方法定义 ,参数说明 × 再 定义定时调度(dispatch) - 指定 定时周期 ,运行方法 × crontab 触发 调度集合 生成job类表,再另一个crontab运行 joblist中的已经实例化好的任务 × 结果存储到 result 中*/2 * * * * sh /data/shell/gmodel/dw_model/cron_mongodb.sh joblist_create 1,3,5,7,9,11,13,15,17,19,21,23,25,27,29,31,33,35,37,39,41,43,45,47,49,51,53,55,57,59 * * * * sh /data/shell/gmodel/dw_model/cron_mongodb.sh job_run 定时 系统调度 :#!/bin/shact=$1echo "-$act----------------------- " >> /tmp/mongo.logif [ $act = "joblist_create" ]; then echo "joblist_create run .." >> /tmp/mongo.log echo `ps -ef|grep -v grep |grepcron_mongodb |grep joblist_create|wc -l` ; if [ `ps -ef|grep -v grep |grepcron_mongodb |grep joblist_create|wc -l` -lt "5" ] ;then /root/tools/mongodb-linux-x86_64-1.4.0/bin/mongo --eval " load('/data/shell/gmodel/dw_model/utils/utils.js'); print( 'joblist_create ',new Date,tojson(joblist_create()) ); " >> /tmp/mongo.log fifiif [ $act = "job_run" ]; then echo "job_run run .." >> /tmp/mongo.log if [ `ps -ef|grep -v grep |grepcron_mongodb |grep job_run|wc -l` -lt "5" ] ;then /root/tools/mongodb-linux-x86_64-1.4.0/bin/mongo --eval " load('/data/shell/gmodel/dw_model/utils/utils.js'); print( 'job_run ',new Date,tojson( job_run() ) ); " >> /tmp/mongo.log fifi1. 初始化( init_mong.js ) collection : var cfs = db.getMongo().getDB('fs').createCollection("functions") ;var dfs = db.getMongo().getDB('fs').createCollection("dispatch") ;var jfs = db.getMongo().getDB('fs').createCollection("joblist") ;var rfs = db.getMongo().getDB('fs').createCollection("result") ;var fs = db.getMongo().getDB('fs').functions ;var ds = db.getMongo().getDB('fs').dispatch ;var js = db.getMongo().getDB('fs').joblist ;var rs = db.getMongo().getDB('fs').result ;//方法名词 和版本号 联合 唯一fs.ensureIndex({ "name":1,"version":1}, {unique: true});//调度名称 唯一ds.ensureIndex({ "name":1 }, {unique: true});//任务list : 结果 = 名称 , 时间戳 , 动作戳( 方法参数 ) 联合唯一js.ensureIndex({ "name":1,"active_stamp":1,"timestamp":1 }, {unique: true});rs.ensureIndex({ "name":1,"active_stamp":1,"timestamp":1 }, {unique: true});/** */fs.drop();ds.drop();rs.drop() ;js.drop();2. function 定义 :fs.save({ // 包名 "package" : "statistics" , // function 名称 - 通过 utils 的 getf('statistics','test_statistics') 取得 "name" : "test_statistics" , // 描述 "desc" : "测试 调度,没有实现用处" , // 方法 版本号 getf('statistics','test_statistics','0.0') // 如果 getf 没有第3个参数 ,默认取最后一个版本 "version" : "0.0" , // 方法运行 参数 描述 "param":{ "ab":"产品ID", "st":"开始时间", "et":"结束时间" }, // 方法体 运行 取传参 为 this.param.ab .... "body" : function(){ sleep(15000); return {"ab":this.param.ab,"st":this.param.st,"et":this.param.et}; }});3. 调度 定义 :ds.save({ // 调度 名称 描述 "name" : "测试-1" , "desc" : "" , // 方法版本 和 方法名 "version" : "0.0" , "fun_name" : "test_statistics" , // 方法 参数 逻辑定义 "param" : { //调度当前时间 可以使用 内置 "dispatch_time" new Date , //并且 可以使用 utils 中的方法 // 其 当参数 遇到 @ - // 就会 load('utils.js') ; param.st = eval( .... ) ; "ab": "100008" , "st": "@ time_displacement( dispatch_time ,'yyyy-MM-dd',-1 ) ", "et": "@ time_displacement( dispatch_time ,'yyyy-MM-dd' ) " }, //定时调度 仿照( crontab ) "run_timing" : { "minute" : "00" , //0-59 "hour" : "*" , //0-23 "day" : "*" , //1-31 "weekday" : "*" //'Sunday','Monday','Tuesday','Wednesday','Thursday','Friday','Saturday' }, // 调度当前是否可用 "is_use" : "true", // 此调度生成 job 最后次时间 // 作用 - 异常处理,可以通过 此属性,补没有生成的 joblist "last_run" : "2010-04-21 00:00"});4. utils.js 时间操作 工具 http://www.mattkruse.com/javascript/date/source.html在其 load 上 添加一个 方法 /* 时间位移* time_displacement( new Date,'yyyy-MM-dd',-1 )*/function time_displacement(ndt,fmt,day,hour,min){ if(typeof ndate == "string") ndt = parseDate(ndt) ; var ndate = new Date( ndt ); if(typeof day != "undefined") ndate.setDate( ndate.getDate()+day ); if(typeof hour != "undefined") ndate.setHours( ndate.getHours()+hour ); if(typeof min != "undefined") ndate.setMinutes( ndate.getMinutes()+min ); return formatDate(ndate,fmt);}5. utils.js 方法 function 取得/*** var fun = getf("assert","structure");* fun.body fun.varsion*/function getf(pack_age,fname,vers){ var dbs = 'fs'; var fs_coll = db.getMongo().getDB(dbs).functions ; if(typeof vers == "undefined") var fun = eval(' fs_coll.find({ "package":"'+pack_age+'", "name":"'+fname+'" }).sort({"version":-1})[0] '); else var fun = eval(' fs_coll.findOne({ "package":"'+pack_age+'", "name":"'+fname+'","version":"'+vers+'" }) '); fun._id = "~"; return fun ;}6. 调度核心方法之一 : 调度时间戳生成 /* 生成 时间戳 : 根据 输入 时间差 , 根据 定时 minute,hour,day,weekday 中查看是否有可能,触发运行 比如 : 2010-04-10 00:01 2010-04-11 23:59 在 00 10 * * 中 有 两次次触发 方法运行 : get_timestamps( '2010-04-10 00:01','2010-04-11 23:59','00','10','*','*' ) 就是 2010-04-10 10:00 , 2010-04-11 10:00 参数格式说明 : stime,etime > new Date \ 'yyyy-MM-dd' minute,hour,day = \d\d weekday = 'Sunday','Monday','Tuesday','Wednesday','Thursday','Friday','Saturday'*/function get_timestamps( stime,etime, minute,hour,day,weekday ){ if(typeof stime == "string") stime = parseDate(stime); if(typeof etime == "string") etime = parseDate(etime); if(stime.getTime() > etime.getTime() )return []; var et = formatDate(etime,'yyyy-MM-dd HH:mm'); var min_cycle = null ; if( minute=="*" ) min_cycle = 1 ; else if( min_cycle == null && hour=="*" ) min_cycle = 1*60 ; else if ( min_cycle == null ) min_cycle = 1*60*24 ; print( "min_cycle = ",min_cycle ); var tts = {}; //stime.setMinutes( stime.getMinutes() + min_cycle ) ; while( stime.getTime() etime.getTime() ){ var st = formatDate(stime,'yyyy-MM-dd HH:mm'); if( minute != "*" && minute!="" ) st = st.replace(/(.*)(\d{2})$/,"$1"+minute); if( hour != "*" && hour!="" ) st = st.replace(/(.*)(\d{2})(:\d{2})$/,"$1"+hour+"$3" ) ; if( day != "*" && day!="" )st = st.replace(/(\d{4}-\d{2}-)(\d{2})(.*)/,"$1"+day+"$3") ; if( ( weekday != "*" && weekday != ww ) || ( et st) ) { stime.setMinutes( stime.getMinutes() + min_cycle ) ; continue ; } // yyyy-MM-dd HH:mm tts[ st ] = 0; stime.setMinutes( stime.getMinutes() + min_cycle ) ; } var arr = []; for(var c in tts ) { arr.push(c); } return arr ;}joblist 生成 :/* 生成调度 ntime 调度运行 环境时间 默认当前joblist 生成 格式{ "_id" : ObjectId("4bd55d4a6d623399d78fd793"), "name" : "测试-1", "timestamp" : "2010-04-22 00:00", "active_stamp" : "", "func_obj" : { "_id" : "~", "package" : "statistics", "name" : "test_statistics", "desc" : "测试 调度,没有实现用处", "version" : "0.0", "param" : { "ab" : "100008", "st" : "2010-04-21", "et" : "2010-04-22" }, "body" : function cf__24__f_cf__12__f_() { sleep(15000); return {ab:this.param.ab, st:this.param.st, et:this.param.et}; } }, "running_time" : { "dispatch_start" : "", "start" : "", "end" : "" }, "level" : "3", "run_status" : "init", "result" : ""}*/function joblist_create(ntime){ if(typeof ntime == "undefined") var nt = new Date ; else if ( typeof ntime == "object" ) var nt = ntime ; else var nt = parseDate(ntime); var run_stat = {"all":0, "success":0,"error":0}; var dbs = 'fs'; var ds = db.getMongo().getDB(dbs).dispatch ; var js = db.getMongo().getDB(dbs).joblist ; var ff = ds.find({"is_use":"true"}) ; //遍历出所有 可用 调度定义 while(ff.hasNext()){ run_stat.all += 1 ; var disp = ff.next(); try{ //取得调度方法体 var fun = getf('statistics',disp.fun_name,disp.version); //调度最晚生成 joblist 时间 if(typeof disp.last_run == "undefined" || disp.last_run=="" ) disp.last_run = formatDate(nt,'yyyy-MM-dd HH:mm:ss'); //根据 调度 最晚生成 joblist 时间 和 当前ntime 时间,取得 job 运行时间 var tks = get_timestamps( disp.last_run , nt , disp.run_timing.minute,disp.run_timing.hour,disp.run_timing.day,disp.run_timing.weekday ) ; for(var i=0;itks.length;i++ ){ // 调度参数生成中的 隐含对象 生成 var dispatch_time = parseDate( tks[i] ) ; fun = getf('statistics',disp.fun_name,disp.version); var pks = []; // 参数实例化 for(var pk in fun.param){ pks.push(pk); var pv = disp.param[pk] ; if( /@/.test(pv) ) fun.param[pk] = eval( pv.replace(/^@(.*)$/,"$1") ); else fun.param[pk]=pv ; } // 生成动作戳 var active_stamp = "" ; var pkss = pks.sort(); for(var ii;iipkss.length;ii++){ active_stamp += pkss[ii]+fun.param[pkss[ii]]; } if( js.count({ "name": disp.name,"active_stamp":active_stamp ,"timestamp":tks[i]}) != 0 ) continue ; js.save({ "name": disp.name , "timestamp" : tks[i] , "active_stamp":active_stamp, "func_obj" : fun , "running_time":{ "dispatch_start" : "" , "start" : "" , "end" : "" }, "level" : "3" , "run_status" : "init" ,// 初始等待 init ,计算等待 wait , 运行中 running , 异常结束 error ,正常结束 end "result" : "" // 结果去向 - 扩展 }); run_stat.success += 1 ; disp.last_run = tks[i] ; } ds.save( disp ); }catch(err){ print(err); run_stat.error += 1 ;} } return run_stat ;}job_run :// 运行function job_run(ntime){ if(typeof ntime == "undefined") var nt = new Date ; else if ( typeof ntime == "object" ) var nt = ntime ; else var nt = parseDate(ntime); var run_stat = {"all":0,"success":0,"error":0}; var dbs = 'fs'; var ds = db.getMongo().getDB(dbs).dispatch ; var js = db.getMongo().getDB(dbs).joblist ; var rs = db.getMongo().getDB(dbs).result ; var ajob = []; var ff = js.find({ "run_status":"init" }).sort({ "level":1 }).limit(5) ; var dispatch_start = formatDate(nt,'yyyy-MM-dd HH:mm'); while(ff.hasNext()){ run_stat.all += 1 ; try{ var job = ff.next(); job.run_status = "wait" ; job.running_time.dispatch_start = dispatch_start ; js.save(job); ajob.push(job); }catch(err){print(err);} } print("start run = ",run_stat.all ); for(var i=0;iajob.length;i++){ try{ var job = ajob[i] ; if( rs.count({ "name":job.name,"active_stamp":job.active_stamp ,"timestamp":job.timestamp }) != 0 ) continue ; var fun = job.func_obj; job.run_status = "running" ; job.running_time.start = formatDate(nt,'yyyy-MM-dd HH:mm'); js.save(job); var res = fun.body() ; rs.save({ "name" : job.name , "param" : fun.param , "timestamp" : job.timestamp , "active_stamp":job.active_stamp , "result" : res }); job.running_time.end = formatDate(nt,'yyyy-MM-dd HH:mm'); job.run_status = "end" ; js.save(job); run_stat.success += 1 ; }catch(err){ print(err); job.run_status = "error" ; job.running_time.end = formatDate(nt,'yyyy-MM-dd HH:mm'); js.save(job); run_stat.error += 1 ; } } return run_stat ;} 10-10 01:00