整理个Python3导出MySQL查询数据d的脚本。
Python依赖包:
pymysql
xlwt
Python脚本:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# =============================================================================
# FileName:
# Desc:
# Author:
# Email:
# HomePage:
# Version:
# LastChange:
# History:
# =============================================================================
import pymysql
import traceback
import logging
import xlwt
import datetime logger = logging.getLogger(__name__) class MySQLServer(object):
def __init__(self, mysql_host,
mysql_user,
mysql_password,
mysql_port=3306,
database_name="mysql",
mysql_charset="utf8",
connect_timeout=60):
self.mysql_host = mysql_host
self.mysql_user = mysql_user
self.mysql_password = mysql_password
self.mysql_port = mysql_port
self.connect_timeout = connect_timeout
self.mysql_charset = mysql_charset
self.database_name = database_name def get_connection(self, return_dict=False):
"""
获取当前服务器的MySQL连接
:return:
"""
if return_dict:
conn = pymysql.connect(
host=self.mysql_host,
user=self.mysql_user,
passwd=self.mysql_password,
port=self.mysql_port,
connect_timeout=self.connect_timeout,
charset=self.mysql_charset,
db=self.database_name,
cursorclass=pymysql.cursors.DictCursor
)
else:
conn = pymysql.connect(
host=self.mysql_host,
user=self.mysql_user,
passwd=self.mysql_password,
port=self.mysql_port,
connect_timeout=self.connect_timeout,
charset=self.mysql_charset,
db=self.database_name,
cursorclass=pymysql.cursors.Cursor
) return conn def mysql_exec(self, mysql_script, mysql_paras=None):
conn = None
cursor = None
try:
message = "在服务器{0}上执行脚本:{1},参数为:{2}".format(
self.mysql_host, mysql_script, str(mysql_paras))
logger.debug(message)
conn = self.get_connection()
cursor = conn.cursor()
if mysql_paras is not None:
cursor.execute(mysql_script, mysql_paras)
else:
cursor.execute(mysql_script)
conn.commit()
except Exception as ex:
warning_message = """
execute script:{mysql_script}
execute paras:{mysql_paras},
execute exception:{mysql_exception}
execute traceback:{mysql_traceback}
""".format(
mysql_script=mysql_script,
mysql_paras=str(mysql_paras),
mysql_exception=str(ex),
mysql_traceback=traceback.format_exc()
)
logger.warning(warning_message)
raise Exception(str(ex))
finally:
if cursor is not None:
cursor.close()
if conn is not None:
conn.close() def mysql_exec_many(self, script_items):
conn = None
cursor = None
try:
conn = self.get_connection()
cursor = conn.cursor()
for script_item in script_items:
sql_script, sql_paras = script_item
message = "在服务器{0}上执行脚本:{1},参数为:{2}".format(
self.mysql_host, sql_script, str(sql_paras))
logger.debug(message)
if sql_paras is not None:
cursor.execute(sql_script, sql_paras)
else:
cursor.execute(sql_script)
conn.commit()
except Exception as ex:
logger.warning("execute exception:{0} \n {1}".format(str(ex), traceback.format_exc()))
raise Exception(str(ex))
finally:
if cursor is not None:
cursor.close()
if conn is not None:
conn.close() def mysql_query(self, mysql_script, mysql_paras=None, return_dict=False):
conn = None
cursor = None
try:
message = "在服务器{0}上执行脚本:{1},参数为:{2}".format(
self.mysql_host, mysql_script, str(mysql_paras))
logger.debug(message)
conn = self.get_connection(return_dict=return_dict)
cursor = conn.cursor()
if mysql_paras is not None:
cursor.execute(mysql_script, mysql_paras)
else:
cursor.execute(mysql_script)
exec_result = cursor.fetchall()
conn.commit()
return exec_result
except Exception as ex:
warning_message = """
execute script:{mysql_script}
execute paras:{mysql_paras},
execute exception:{mysql_exception}
execute traceback:{mysql_traceback}
""".format(
mysql_script=mysql_script,
mysql_paras=str(mysql_paras),
mysql_exception=str(ex),
mysql_traceback=traceback.format_exc()
)
logger.warning(warning_message)
raise Exception(str(ex))
finally:
if cursor is not None:
cursor.close()
if conn is not None:
conn.close() class ExeclExporter(object):
@classmethod
def export_excel(cls, file_path, row_items):
try:
work_book = xlwt.Workbook()
work_sheet = work_book.add_sheet('sheet01', cell_overwrite_ok=True)
column_items = []
if len(row_items) > 0:
first_row = row_items[0]
column_items = list(first_row.keys())
for column_index in range(0, len(column_items)):
column_name = column_items[column_index]
work_sheet.write(0, column_index, column_name)
for row_index in range(1, len(row_items) + 1):
row_item = row_items[row_index - 1]
for column_index in range(0, len(column_items)):
column_name = column_items[column_index]
work_sheet.write(row_index, column_index, row_item[column_name])
work_book.save(file_path)
except Exception as ex:
logger.warning("执行异常,异常信息:{0}\n堆栈信息:{1}".format(
str(ex),
traceback.format_exc()
)) def export_excel():
mysql_server = MySQLServer(
mysql_host="192.168.199.194",
mysql_port=3306,
mysql_user='admin',
mysql_password='admin',
database_name='demo01',
mysql_charset='utf8'
)
sql_script = """
select * from tb001 limit 10;
"""
row_items = mysql_server.mysql_query(mysql_script=sql_script, return_dict=True)
file_path = "./" + datetime.datetime.now().strftime("%Y%m%d%H%M%S.xls")
ExeclExporter.export_excel(
file_path=file_path,
row_items=row_items) if __name__ == '__main__':
export_excel()