本文实例为大家分享了Python每天迁移MySQL历史数据到历史库的具体代码,供大家参考,具体内容如下

#!/usr/bin/env python
# coding:utf-8
__author__ = 'John'


import MySQLdb
import sys
import datetime
import time

class ClassMigrate(object):
  def _get_argv(self):
    self.usage = """
      usage():
      python daily_migration.py --source=192.168.1.4:3306/db_name:tab_name/proxy/password \\
                    --dest=192.168.1.150:13301/db_name_archive:tab_name_201601/proxy/password \\
                    --delete_strategy=delete --primary_key=auto_id --date_col=ut --time_interval=180
      """
    if len(sys.argv) == 1:
      print self.usage
      sys.exit(1)
    elif sys.argv[1] == '--help' or sys.argv[1] == '-h':
        print self.usage
        sys.exit()
    elif len(sys.argv) > 2:
      for i in sys.argv[1:]:
        _argv = i.split('=')
        if _argv[0] == '--source':
          _list = _argv[1].split('/')
          self.source_host = _list[0].split(':')[0]
          self.source_port = int(_list[0].split(':')[1])
          self.source_db = _list[1].split(':')[0]
          self.source_tab = _list[1].split(':')[1]
          self.source_user = _list[2]
          self.source_password = _list[3]
        elif _argv[0] == '--dest':
          _list = _argv[1].split('/')
          self.dest_host = _list[0].split(':')[0]
          self.dest_port = int(_list[0].split(':')[1])
          self.dest_db = _list[1].split(':')[0]
          self.dest_tab = _list[1].split(':')[1]
          self.dest_user = _list[2]
          self.dest_password = _list[3]
        elif _argv[0] == '--delete_strategy':
          self.deleteStrategy = _argv[1]
          if self.deleteStrategy not in ('delete', 'drop'):
            print (self.usage)
            sys.exit(1)
        elif _argv[0] == '--primary_key':
          self.pk = _argv[1]
        elif _argv[0] == '--date_col':
          self.date_col = _argv[1]
        elif _argv[0] == '--time_interval':
          self.interval = _argv[1]
        else:
          print (self.usage)
          sys.exit(1)

  def __init__(self):
    self._get_argv()
## --------------------------------------------------------------------
    self.sourcedb_conn_str = MySQLdb.connect(host=self.source_host, port=self.source_port, user=self.source_user, passwd=self.source_password, db=self.source_db, charset='utf8')
    self.sourcedb_conn_str.autocommit(True)
    self.destdb_conn_str = MySQLdb.connect(host=self.dest_host, port=self.dest_port, user=self.dest_user, passwd=self.dest_password, db=self.dest_db, charset='utf8')
    self.destdb_conn_str.autocommit(True)
## --------------------------------------------------------------------
    self.template_tab = self.source_tab + '_template'
    self.step_size = 20000
## --------------------------------------------------------------------
    self._migCompleteState = False
    self._deleteCompleteState = False
## --------------------------------------------------------------------
    self.source_cnt = ''
    self.source_min_id = ''
    self.source_max_id = ''
    self.source_checksum = ''
    self.dest_cn = ''
## --------------------------------------------------------------------
    self.today = time.strftime("%Y-%m-%d")
    # self.today = '2016-05-30 09:59:40'

def sourcedb_query(self, sql, sql_type):
    try:
      cr = self.sourcedb_conn_str.cursor()
      cr.execute(sql)
      if sql_type == 'select':
        return cr.fetchall()
      elif sql_type == 'dml':
        rows = self.sourcedb_conn_str.affected_rows()
        return rows
      else:
        return True
    except Exception, e:
      print (str(e) + "<br>")
      return False
    finally:
      cr.close()

  def destdb_query(self, sql, sql_type, values=''):
    try:
      cr = self.destdb_conn_str.cursor()
      if sql_type == 'select':
        cr.execute(sql)
        return cr.fetchall()
      elif sql_type == 'insertmany':
        cr.executemany(sql, values)
        rows = self.destdb_conn_str.affected_rows()
        return rows
      else:
        cr.execute(sql)
        return True
    except Exception, e:
      print (str(e) + "<br>")
      return False
    finally:
      cr.close()

 def create_table_from_source(self):
    '''''因为tab_name表的数据需要迁移到archive引擎表,所以不适合使用这种方式。 预留作其他用途。'''
    try:
      sql = "show create table %s;" % self.source_tab
      create_str = self.sourcedb_query(sql, 'select')[0][1]
      create_str = create_str.replace('CREATE TABLE', 'CREATE TABLE IF NOT EXISTS')
      self.destdb_query(create_str, 'ddl')
      return True
    except Exception, e:
      print (str(e) + "<br>")
      return False

  def create_table_from_template(self):
    try:
      sql = 'CREATE TABLE IF NOT EXISTS %s like %s;' % (self.dest_tab, self.template_tab)
      state = self.destdb_query(sql, 'ddl')
      if state:
        return True
      else:
        return False
    except Exception, e:
      print (str(e + "<br>") + "<br>")
      return False

  def get_min_max(self):
    """ 创建目标表、并获取源表需要迁移的总条数、最小id、最大id """
    try:
      print ("\nStarting Migrate at -- %s <br>") % (datetime.datetime.now().__str__())
      sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
           and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \
            % (self.pk, self.pk, self.source_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
      q = self.sourcedb_query(sql, 'select')
      self.source_cnt = q[0][0]
      self.source_min_id = q[0][1]
      self.source_max_id = q[0][2]
      self.source_checksum = str(self.source_cnt) + '_' + str(self.source_min_id) + '_' + str(self.source_max_id)
      if self.source_cnt == 0 or self.source_min_id == -1 or self.source_max_id == -1:
        print ("There is 0 record in source table been matched! <br>")
        return False
      else:
        return True
    except Exception, e:
      print (str(e) + "<br>")
      return False

  def migrate_2_destdb(self):
    try:
      get_min_max_id = self.get_min_max()
      if get_min_max_id:
        k = self.source_min_id
        desc_sql = "desc %s;" % self.source_tab
        # self.filed = []
        cols = self.sourcedb_query(desc_sql, 'select')
        # for j in cols:
        #   self.filed.append(j[0])
        fileds = "%s," * len(cols) # 源表有多少个字段,就拼凑多少个%s,拼接到insert语句
        fileds = fileds.rstrip(',')
        while k <= self.source_max_id:
          sql = """select * from %s where %s >= %d and %s< %d \
               and %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
               and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """\
             % (self.source_tab, self.pk, k, self.pk, k+self.step_size, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
          print ("\n%s <br>") % sql
          starttime = datetime.datetime.now()
          results = self.sourcedb_query(sql, 'select')
          insert_sql = "insert into " + self.dest_tab + " values (%s)" % fileds
          rows = self.destdb_query(insert_sql, 'insertmany', results)
          if rows == False:
            print ("Insert failed!! <br>")
          else:
            print ("Inserted %s rows. <br>") % rows
          endtime = datetime.datetime.now()
          timeinterval = endtime - starttime
          print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds <br>")
          k += self.step_size
        print ("\nInsert complete at -- %s <br>") % (datetime.datetime.now().__str__())
        return True
      else:
        return False
    except Exception, e:
      print (str(e) + "<br>")
      return False

  def verify_total_cnt(self):
    try:
      sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
           and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \
            % (self.pk, self.pk, self.dest_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
      dest_result = self.destdb_query(sql, 'select')
      self.dest_cnt = dest_result[0][0]
      dest_checksum = str(self.dest_cnt) + '_' + str(dest_result[0][1]) + '_' + str(dest_result[0][2])
      print ("source_checksum: %s, dest_checksum: %s <br>") % (self.source_checksum, dest_checksum)
      if self.source_cnt == dest_result[0][0] and dest_result[0][0] != 0 and self.source_checksum == dest_checksum:
        self._migCompleteState = True
        print ("Verify successfully !!<br>")
      else:
        print ("Verify failed !!<br>")
        sys.exit(77)
    except Exception, e:
      print (str(e) + "<br>")

  def drop_daily_partition(self):
    try:
      if self._migCompleteState:
        sql = """explain partitions select * from %s where %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00')
               and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """\
             % (self.source_tab, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
        partition_name = self.sourcedb_query(sql, 'select')
        partition_name = partition_name[0][3]




        sql = """select count(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1) from %s partition (%s)""" \
            % (self.pk, self.pk, self.source_tab, partition_name)
        q = self.sourcedb_query(sql, 'select')
        source_cnt = q[0][0]
        source_min_id = q[0][1]
        source_max_id = q[0][2]
        checksum = str(source_cnt) + '_' + str(source_min_id) + '_' + str(source_max_id)
        if source_cnt == 0 or source_min_id == -1 or source_max_id == -1:
          print ("There is 0 record in source PARTITION been matched! <br>")
        else:
          if checksum == self.source_checksum:
            drop_par_sql = "alter table %s drop partition %s;" % (self.source_tab, partition_name)
            droped = self.sourcedb_query(drop_par_sql, 'ddl')
            if droped:
              print (drop_par_sql + " <br>")
              print ("\nDrop partition complete at -- %s <br>") % (datetime.datetime.now().__str__())
              self._deleteCompleteState = True
            else:
              print (drop_par_sql + " <br>")
              print ("Drop partition failed.. <br>")
          else:
            print ("The partition %s checksum failed !! Drop failed !!") % partition_name
            sys.exit(77)
    except Exception, e:
      print (str(e) + "<br>")

  def delete_data(self):
    try:
      if self._migCompleteState:
        k = self.source_min_id
        while k <= self.source_max_id:
          sql = """delete from %s where %s >= %d and %s< %d \
               and %s >= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 00:00:00') \
               and %s <= CONCAT(DATE_FORMAT(DATE_ADD('%s', INTERVAL -%s day),'%%Y-%%m-%%d'), ' 23:59:59') """ \
             % (self.source_tab, self.pk, k, self.pk, k+self.step_size, self.date_col, self.today, self.interval, self.date_col, self.today, self.interval)
          print ("\n%s <br>") % sql
          starttime = datetime.datetime.now()
          rows = self.sourcedb_query(sql, 'dml')
          if rows == False:
            print ("Delete failed!! <br>")
          else:
            print ("Deleted %s rows. <br>") % rows
          endtime = datetime.datetime.now()
          timeinterval = endtime - starttime
          print("Elapsed :" + str(timeinterval.seconds) + '.' + str(timeinterval.microseconds) + " seconds <br>")
          time.sleep(1)
          k += self.step_size
        print ("\nDelete complete at -- %s <br>") % (datetime.datetime.now().__str__())
        self._deleteCompleteState = True
    except Exception, e:
      print (str(e) + "<br>")

  def do(self):
    tab_create = self.create_table_from_template()
    if tab_create:
      migration = self.migrate_2_destdb()
      if migration:
        self.verify_total_cnt()
        if self._migCompleteState:
          if self.deleteStrategy == 'drop':
            self.drop_daily_partition()
          else:
            self.delete_data()
          print ("\n<br>")
          print ("====="*5 + '<br>')
          print ("source_total_cnt: %s <br>") % self.source_cnt
          print ("dest_total_cnt: %s <br>") % self.dest_cnt
          print ("====="*5 + '<br>')
          if self._deleteCompleteState:
            print ("\nFinal result: Successfully !! <br>")
            sys.exit(88)
          else:
            print ("\nFinal result: Failed !! <br>")
            sys.exit(254)
    else:
      print ("Create table failed ! Exiting. . .")
      sys.exit(255)

f = ClassMigrate()
f.do()

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。

01-31 15:23