python执行sql文件
之前也找了不少时间,没找到python执行sql文件的方法,有需求就自己写了一个
正则水平不足,将就用了
不过原则上是需要导入有大量数据的sql文件不要使用python来做,mysql自身效率会高很多
写这个东西目的
1、安全初始化数据库结构
2、避免使用系统中的mysql程序,达到完全由python实现初始化数据库功能
修正了一下,不再返回字符串、返回列表
最新修正,加入了一些新参数和校验
点击(此处)折叠或打开
- def format_sql_file(init_sql_file, drop_table=False, with_data=False):
- """
- 格式化mysqldump导出的sql文件
- @param init_sql_file: sql文件地址
- @param drop_table: 是否执行drop table命令
- @param with_data: 是否输出对数据有操作的语句
- @raise ParameterIllegal: 通用错误抛出
- @return: 可以直接被python exec的sql语句组成的列表
- """
- if drop_table not in (True, False):
- raise ParameterIllegal('drop_table value error')
- if not os.path.exists(init_sql_file):
- raise ParameterIllegal('init_sql_file can not be found')
- try:
- size = os.path.getsize(init_sql_file)
- except OSError:
- raise ParameterIllegal('init_sql_file get size of init_sql_file catch error')
- if size >= util.MAX_SQL_FILE_SIZE:
- raise ParameterIllegal('size of init_sql_file too large')
- try:
- f = open(init_sql_file, 'r')
- except OSError:
- raise ParameterIllegal('open init_sql_file catch error')
- # drop table的语句列表
- drop_table_list = []
- # 其他drop语句(存储过程,函数,触发器等
- other_drop_list = []
- # 创建语句列表
- create_list = []
- # 包含在delimiter中的sql语句
- sql_delimiter_list = []
- # 创建view语句
- view_sql_list = []
- # alter 语句
- alter_list = []
- # insert 语句
- insert_list = []
- # update 语句
- update_list = []
- # delete 语句
- delete_list = []
- # truncate 语句
- truncate_list = []
- try:
- line_list = f.readlines()
- except IOError:
- f.close()
- raise ParameterIllegal('read from file io error')
- except OSError:
- f.close()
- raise ParameterIllegal('read from file os error')
- f.close()
- checked_line_list = []
- for line in line_list:
- # 标准sql结束符为单独一行直接报错,其实可以处理一下 直接塞到前一行的结尾
- # 但是还要对前一行内容做校验 太麻烦
- if line.strip() == ';':
- raise ParameterIllegal('first string is [;],error sql file')
- if len(line) >= 2:
- if line[:2] == '--':
- continue
- checked_line_list.append(line)
- temp_string = ''.join(checked_line_list)
- # 去除BOMB头
- if temp_string[:3] == BOM_HEAD:
- temp_string = temp_string[3:]
- if len(temp_string) < 10:
- raise ParameterIllegal('query_string_in_one_line less then 10')
- # 表分区语句
- partion_in_annotations = re.compile('(/\*\!50100\s+(PARTITION\s+BY\s+[\S\s]+?)\s+\*/ {0,1});{0,1}')
- # drop 和 view 开头的语句
- drop_and_view_in_annotations = \
- re.compile('(drop.+?(\*/){0,1};)|(VIEW\s+?[\S]+?\s+?as\s+?select[\s\S]+?(\*/){0,1};)', re.IGNORECASE)
- # 包含在以DELIMITER开头DELIMITER结尾的部分
- delimiter_and_annotations = re.compile('(DELIMITER[\s\S]+?DELIMITER\s+?;)|(/\*[\s\S]+?\*/ {0,1};{0,1})',
- re.IGNORECASE)
- # DELIMITER结束标记 group(2) 表示存在DELIMITER后没有定义分割符就换行
- delimiter_end_intance = re.compile('(DELIMITER\s+?;\s*)|(DELIMITER\s*)', re.IGNORECASE)
- # 获取在DELIMITER与注释中的字符串
- delimiter_and_annotations_res = re.findall(delimiter_and_annotations, temp_string)
- # create语句正则
- create_intance = re.compile('(/\*!\d{1,10}\s+){0,1}create\s+?(DEFINER=\S+ ){0,1}\s*', re.IGNORECASE)
- delimiter_start = 0
- # 结束正则
- end_intance = None
- # 常用的4中 DELIMITER重定义结束符
- end_intance_1 = re.compile('(end (\*/){0,1}\s*\$\$)|(end\s*$)|(\$\$)', re.IGNORECASE)
- end_intance_2 = re.compile('(end (\*/){0,1}\s*\;;)|(end\s*$)|(;;)', re.IGNORECASE)
- end_intance_3 = re.compile('(end (\*/){0,1}\s*!!)|(end\s*$)|(!!)', re.IGNORECASE)
- end_intance_4 = re.compile('(end (\*/){0,1}\s*!!)|(end\s*$)|(//)', re.IGNORECASE)
- temp_list = []
- if len(delimiter_and_annotations_res) > 0:
- for res_tuple in delimiter_and_annotations_res:
- delimiter = res_tuple[0]
- annotations = res_tuple[1]
- if delimiter != '': # DELIMITER字符串
- delimiter_list = delimiter.split('\n')
- for line in delimiter_list:
- if line.strip() == '':
- continue
- if not delimiter_start:
- # delimiter开始标记
- delimiter_start = 1
- delimiter_mark = re.sub('delimiter', '', line, flags=re.IGNORECASE).strip()
- if delimiter_mark == '$$':
- end_intance = end_intance_1
- elif delimiter_mark == ';;':
- end_intance = end_intance_2
- elif delimiter_mark == '!!':
- end_intance = end_intance_3
- elif delimiter_mark == '//':
- end_intance = end_intance_4
- else:
- raise ParameterIllegal('unknown delimiter_mark')
- continue
- else:
- delimiter_end = re.search(delimiter_end_intance, line)
- if delimiter_end is not None:
- # delimiter结束
- delimiter_start = 0
- if delimiter_end.group(1) is None:
- raise ParameterIllegal('find a delimiter with out start or end mark')
- end_intance = None
- continue
- search_res = re.search(create_intance, line)
- if search_res is not None:
- temp_list.append('CREATE %s' % re.sub(create_intance, '', line))
- continue
- if end_intance is not None:
- end_res = re.search(end_intance, line)
- if end_res is not None:
- # end字符单独一行
- if end_res.group(3) is not None:
- raise ParameterIllegal('find key word end in one line')
- # 其他行commit_mark字符串
- if end_res.group(4) is not None:
- raise ParameterIllegal('find commit_mark line')
- # 修改存储过程结尾
- temp_list.append('END')
- sql_string = '\n'.join(temp_list)
- temp_list = []
- sql_delimiter_list.append(sql_string)
- continue
- temp_list.append(line)
- if annotations != '': # 注释字符串
- # mysql 5.1的dump语句会把DROP PROCEDURE放注释中
- # 获取在注释中的drop语句
- drop_and_view_in_annotations_res = re.search(drop_and_view_in_annotations, annotations)
- if drop_and_view_in_annotations_res is not None:
- drop_in_annotations_string = drop_and_view_in_annotations_res.group(1)
- drop_ex_string = drop_and_view_in_annotations_res.group(2)
- view_string = drop_and_view_in_annotations_res.group(3)
- view_ex_string = drop_and_view_in_annotations_res.group(4)
- # drop语句
- if drop_in_annotations_string is not None:
- if drop_ex_string is not None:
- drop_in_annotations_string = drop_in_annotations_string.replace(drop_ex_string, '')
- other_drop_list.append(drop_in_annotations_string)
- # view语句
- if view_string is not None:
- if view_ex_string is not None:
- view_string = view_string.replace(view_ex_string, '')
- view_sql_list.append('CREATE %s' % view_string)
- # 还在匹配到delimiter开始的部分,说明没有匹配到delimiter结束符
- if delimiter_start:
- raise ParameterIllegal('delimiter not end')
- partion_string_list = re.findall(partion_in_annotations, temp_string)
- # 去除partion语句的注释
- for find_string in partion_string_list:
- temp_string = temp_string.replace(find_string[0], find_string[1], 1)
- # 获取去除注释和delimiter的部分
- no_annotations_string = re.sub(delimiter_and_annotations, '', temp_string)
- del line_list
- del checked_line_list
- del temp_string
- create_table_intance \
- = re.compile('CREATE\s+(TABLE|VIEW)\s+?(([\s\S]*?)CREATE\s+(TABLE|VIEW)){0,}[\s\S]+?;\s*$', re.IGNORECASE)
- re_string = '(^CREATE DATABASE.+)|(^use .+?;\s*$)|' + \
- '(^insert into .+?;\s*$)|(^update .+?set .+?;\s*$)|(^alter table .+?;\s*$)|' + \
- '(^delete from .+?where .+?;\s*$)|(^truncate table\s+?.+?;\s*$)|' + \
- '(CREATE\s+?(DEFINER=.+?\s+?){0,1}(PROCEDURE|FUNCTION|TRIGGER){1}.*)|' + \
- '(DEFINER=.+?\s)|(^drop table.+?;\s*$)|(^drop\s+?database\s+?.+?;\s*$)|(drop {1}.+?;\s*$)|(.+?;\s*$)'
- split_intance = re.compile(re_string, re.IGNORECASE)
- auto_increment_intance = re.compile('AUTO_INCREMENT=\d{1,20} {1}', re.IGNORECASE)
- view_intance = re.compile('create\s+?(ALGORITHM=.+?\s+?){0,1}' +
- '(DEFINER=.+?\s+?){0,1}(SQL\s+?SECURITY\s+?DEFINER\s+?)view', re.IGNORECASE)
- # 分行校验
- no_annotations_string_list = no_annotations_string.split('\n')
- temp_list = []
- for line in no_annotations_string_list:
- if line.strip() == '':
- continue
- match_res = re.search(split_intance, line)
- if match_res is not None:
- # 包含创建数据库语句
- if match_res.group(1) is not None:
- raise ParameterIllegal('create database key word find, error')
- # 包含use 数据库语句
- if match_res.group(2) is not None:
- raise ParameterIllegal('use databse key word find, error')
- # ===========这里可以一定程度保证存储过程和函数在sql_delimiter_list中
- # 包含insert语句
- if match_res.group(3) is not None:
- insert_list.append(match_res.group(3))
- continue
- # 包含update语句
- if match_res.group(4) is not None:
- update_list.append(match_res.group(4))
- continue
- # 包含alter语句
- if match_res.group(5) is not None:
- alter_list.append(match_res.group(5))
- continue
- # 包含delete from语句
- if match_res.group(6) is not None:
- delete_list.append(match_res.group(6))
- continue
- # 包含truncate table语句
- if match_res.group(7) is not None:
- truncate_list.append(match_res.group(7))
- continue
- # 包含PROCEDURE|FUNCTION|trigger语句
- # 这些语句必须被包含在上边已经处理过的DELIMITER语句中,否则报错
- if match_res.group(8) is not None:
- print match_res.group(8)
- raise ParameterIllegal('CREATE PROCEDURE|FUNCTION|TRIGGER not between two DELIMITER')
- # 包含DEFINER=字段
- if match_res.group(11) is not None:
- if re.search(view_intance, line) is not None:
- view_sql_list.append(re.sub(view_intance, 'CREATE VIEW', line))
- continue
- else:
- raise ParameterIllegal('DEFINER key word find')
- # 表drop语句
- if match_res.group(12) is not None:
- drop_table_list.append(match_res.group(12))
- continue
- # drop database 语句
- if match_res.group(13) is not None:
- raise ParameterIllegal('drop database sql find')
- # 其他drop语句
- if match_res.group(14) is not None:
- other_drop_list.append(match_res.group(14))
- continue
- # 匹配到结束符号;
- if match_res.group(15) is not None:
- temp_list.append(line)
- # 合并成sql语句
- sql_string = '\n'.join(temp_list)
- temp_list = []
- # sql语句检查
- create_match = re.match(create_table_intance, sql_string)
- if create_match is None:
- raise ParameterIllegal('sql [%s] not sql of create table or view' % sql_string)
- # 发现多个create在当前语句中
- if create_match.group(2) is not None:
- raise ParameterIllegal('sql [%s] find over one create in one' % sql_string)
- # 创建的是view
- if create_match.group(1).lower() == 'view':
- view_sql_list.append(sql_string)
- else:
- create_list.append(sql_string)
- continue
- # 除去auto_increment
- line = re.sub(auto_increment_intance, '', line)
- temp_list.append(line)
- full_string_list = other_drop_list
- if drop_table:
- full_string_list += drop_table_list
- full_string_list = full_string_list + create_list + alter_list + view_sql_list
- if len(sql_delimiter_list) > 0:
- # 插入源文件中DELIMITER包含的内容
- full_string_list += sql_delimiter_list
- if with_data:
- full_string_list = full_string_list + truncate_list + delete_list + insert_list + update_list
- if len(full_string_list) == 0:
- raise ParameterIllegal('no sql line find in file')
- return ['set autocommit=1;'] + full_string_list