%pyspark#查询认证用户import sys#import MySQLdbimport mysql.connectorimport pandas as pdimport datetimeimport timeoptmap = { 'dbuser' : 'haoren', 'dbpass' : 'G4d', 'dbhost' : '172.12.112.5', 'dbport' : 3306, 'dbname' : 'GMDB' }def sql_select(reqsql): ret = '' try: db_conn = mysql.connector.connect(user=optmap['dbuser'], password=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], database=optmap['dbname']) db_cursor=db_conn.cursor() count = db_cursor.execute(reqsql) ret = db_cursor.fetchall() except mysql.connector.Error as e: print ('Error : {}'.format(e)) finally: db_cursor.close() db_conn.close return retuserlist = []def renzhengsingger(startday,endday): t1 = int(time.mktime(time.strptime(startday,'%Y-%m-%d %H:%M:%S')) ) t2 = int(time.mktime(time.strptime(endday,'%Y-%m-%d %H:%M:%S'))) for n in range(0,10): reqsql = "select PERFORMERID,sum(DURATION)/3600 from PERFORMERSHOWTIMERECORD%d where STARTTIME >=%s and STARTTIME ret = sql_select(reqsql) userlist.append(ret) #print userlist for i in range(0,10): for p in userlist[i]: print p[0],p[1] renzhengsingger('2017-08-01 00:00:00','2017-09-01 00:00:00') ======================================================================================================================%pyspark#查询认证用户import sys#import MySQLdbimport mysql.connectorimport pandas as pdimport datetimeimport timeoptmap = { 'dbuser' : 'haoren', 'dbpass' : 'G4d', 'dbhost' : '172.12.112.8', 'dbport' : 3306, 'dbname' : 'IMDB' }optmap1 = { 'dbuser' : 'haoren', 'dbpass' : 'G4d', 'dbhost' : '172.12.112.5', 'dbport' : 3306, 'dbname' : 'GMDB' }def sql_select(reqsql): ret = '' try: db_conn = mysql.connector.connect(user=optmap['dbuser'], password=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], database=optmap['dbname']) db_cursor=db_conn.cursor() count = db_cursor.execute(reqsql) ret = db_cursor.fetchall() except mysql.connector.Error as e: print ('Error : {}'.format(e)) finally: db_cursor.close() db_conn.close return retdef sql_select1(reqsql1): ret = '' try: db_conn1 = mysql.connector.connect(user=optmap1['dbuser'], password=optmap1['dbpass'], host=optmap1['dbhost'], port=optmap1['dbport'], database=optmap1['dbname']) db_cursor1=db_conn1.cursor() count = db_cursor1.execute(reqsql1) ret1 = db_cursor1.fetchall() except mysql.connector.Error as e: print ('Error : {}'.format(e)) finally: db_cursor1.close() db_conn1.close return ret1#定义查询认证用户函数def renzhengsingger(startday,endday): t1 = int(time.mktime(time.strptime(startday,'%Y-%m-%d %H:%M:%S')) ) t2 = int(time.mktime(time.strptime(endday,'%Y-%m-%d %H:%M:%S'))) reqsql = "select PERFORMERID,from_unixtime(ADDTIME) from PERFORMERINFO where ADDTIME >=%s and ADDTIME ret = sql_select(reqsql) for i in ret: #print i[0] id = int(i[0])%10 reqsql1 = "select sum(DURATION)/3600 from PERFORMERSHOWTIMERECORD%d where STARTTIME >=%s and STARTTIME #print reqsql1 ret1 = sql_select1(reqsql1) print i[0],",",i[1],",",ret1[0][0] renzhengsingger('2017-08-01 00:00:00','2017-09-01 00:00:00')================================================================================================%pyspark#encoding=gbk#-*-coding:gbk-*-import sys#import MySQLdbimport mysql.connectorimport pandas as pdimport datetimeimport timeoptmap = { 'dbuser' : 'haoren', 'dbpass' : 'G4d', 'dbhost' : '172.17.12.8', 'dbport' : 3306, 'dbname' : 'IMDB' }optmap1 = { 'dbuser' : 'haoren', 'dbpass' : 'G4d', 'dbhost' : '172.17.12.5', 'dbport' : 3306, 'dbname' : 'GMDB' }def sql_select(reqsql): #定义数据库连接执行函数 ret = '' try: db_conn = mysql.connector.connect(user=optmap['dbuser'], password=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], database=optmap['dbname']) db_cursor=db_conn.cursor() count = db_cursor.execute(reqsql) ret = db_cursor.fetchall() except mysql.connector.Error as e: print ('Error : {}'.format(e)) finally: db_cursor.close() db_conn.close return retdef sql_select1(reqsql1): #定义数据库连接执行函数 ret = '' try: db_conn1 = mysql.connector.connect(user=optmap1['dbuser'], password=optmap1['dbpass'], host=optmap1['dbhost'], port=optmap1['dbport'], database=optmap1['dbname']) db_cursor1=db_conn1.cursor() count = db_cursor1.execute(reqsql1) ret1 = db_cursor1.fetchall() except mysql.connector.Error as e: print ('Error : {}'.format(e)) finally: db_cursor1.close() db_conn1.close return ret1#批量查询消费者的昵称def getnickname(uid): #定义函数 id = int(uid)%10 reqsql = "select CHANNELNICKNAME from CHARBASE%d where ID=%d" %(id,uid) #数据库字段可能含有中文,需要处理后方可正常显示 #reqsql = "select NICKNAME from CHARBASE%d where ID=%d" %(id,uid) ret = sql_select(reqsql) return retuserlist = [49072058,54332450,23154709] #定义一个字典for i in userlist: #查询消费者昵称 ret1 = getnickname(i) ss = ret1[0][0] ss = ss.encode('unicode-escape').decode('string_escape') #开始处理带有中文的字符 nick_name = ss.decode('gbk') #解码后方可正常显示 print i ,nick_name =====================================================================================================#!/usr/bin/python#-*-coding:utf-8-*-import sysimport MySQLdb#import pandas as pdimport datetimeimport timeoptmap = { 'dbuser' : 'haoren', 'dbpass' : 'qwomet', 'dbhost' : '172.17.12.8', 'dbport' : 3306, 'dbname' : 'IMDB' }optmap1 = { 'dbuser' : 'haoren', 'dbpass' : 'qwomet', 'dbhost' : '172.17.12.5', 'dbport' : 3306, 'dbname' : 'GMDB' }def sql_select(reqsql): try: db_conn = MySQLdb.connect(user=optmap['dbuser'], passwd=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], db=optmap['dbname']) db_cursor=db_conn.cursor() db_conn.query("use %s"%optmap['dbname']) count = db_cursor.execute(reqsql) ret = db_cursor.fetchall() #获取结果集 db_cursor.close() db_conn.close return ret except MySQLdb.Error,e: print "Mysql ERROR %d:%s" %(e.args[0], e.args[1]) return ''#或者这样写代替MySQLdb模块的写法import mysql.connectordef sql_select(reqsql): ret = '' try: db_conn = mysql.connector.connect(user=optmap['dbuser'], password=optmap['dbpass'], host=optmap['dbhost'], port=optmap['dbport'], database=optmap['dbname']) db_cursor=db_conn.cursor() count = db_cursor.execute(reqsql) ret = db_cursor.fetchall() except mysql.connector.Error as e: print ('Error : {}'.format(e)) finally: db_cursor.close() db_conn.close return ret#遍历文件夹def get_files(dir, pattern): res_file_list =[] if os.path.exists(dir): cur_file_list = os.listdir(dir) for file_name in cur_file_list: if re.search(pattern, file_name): res_file_list.append(file_name) return res_file_list else: return 'no' def main(): #显示昨天的时间 例如 20170920 cur_day = time.strftime("%Y%m%d", time.localtime(time.time()-86400)) print "cur_day=" ,cur_day #换一种格式显示昨天的时间 例如 170920 log_day = time.strftime('%y%m%d', time.localtime(time.mktime(time.strptime(cur_day, '%Y%m%d')))) print "log_day=" ,log_day if len(sys.argv)>1: cur_day = sys.argv[1] log_day = sys.argv[1][2:] tabletime = cur_day print "tabletime=" ,tabletime #截取 例如 2017-09-20 date = str(tabletime[0:4]) + "-" + str(tabletime[4:6]) + "-" + str(tabletime[6:]) print "date=" ,date #显示0点0分0秒 例如2017-09-20 00:00:00 datetmp = date + " 00:00:00" print "datetmp=" ,datetmp #显示时间数组,例如 time.struct_time(tm_year=2017, tm_mon=9, tm_mday=20, tm_hour=0, tm_min=0, tm_sec=0, tm_wday=2, tm_yday=263, tm_isdst=-1) timeArray = time.strptime(datetmp,"%Y-%m-%d %H:%M:%S") print "timeArray=" ,timeArray #显示时间戳 例如 1505836800 timeStamp = int(time.mktime(timeArray)) print "timeStamp=" ,timeStampmain()