# -*- coding:utf-8 -*- import pymysql class mysql:
def __init__(self, host, port, dbuser, dbpwd, dbname):
self.host = host
self.port = port
self.dbuser = dbuser
self.dbpwd = dbpwd
self.dbname = dbname
self._conn = None
self.Connect() # 默认链接数据库, 最后要调用关闭链接的函数Con_close()函数 def Connect(self):
if not self._conn:
# print('\033[33m 已创建数据库链接.......... \033[5m')
self._conn = pymysql.connect(host=self.host,
port=self.port,
user=self.dbuser,
passwd=self.dbpwd,
db=self.dbname,
use_unicode=True,
charset='utf8',
cursorclass=pymysql.cursors.DictCursor) # cursorclass=pymysql.cursors.DictCursor
# MySQLdb默认查询结果都是返回tuple,输出时候不是很方便,必须按照0,1这样读取,
# 无意中在网上找到简单的修改方法,就是传递一个cursors.DictCursor就行。
else:
print('\033[33m 链接数据库已存在,请直接操作!\033[5m')
pass def Con_close(self):
if self._conn:
# print('\033[33m 数据库链接已关闭!!! \033[5m')
self._conn.close()
self._conn = None
pass
pass def NewCursor(self):
cur = self._conn.cursor()
if cur:
return cur
else:
print('\033[33m #Error# Get New Cursor Failed \033[5m')
return None
pass def DelCursor(self, cur):
if cur:
cur.close()
pass
pass def Export(self, sql, file_name, colfg='\t\t'):
"""
将查询结果导出到外部文件中
:param sql:
:param file_name:
:param colfg:
:return:
"""
rt = self.Query(sql)
if rt:
with open(file_name, 'w') as fd:
for row in rt:
in_info = ''
for col in row:
in_info += str(col) + colfg
in_info += '\n'
fd.write(in_info) def Query(self, sql, debug=0):
"""
对数据库执行查询操作,sql为所需要执行的查询语句,debug控制是否显示查询的表结构(默认不查询即debug=0)
:param sql:
:param debug:
:return:
"""
rt = []
cur = self.NewCursor()
if not cur:
return rt
try:
cur.execute(sql)
while 1:
ts = cur.fetchone()
if ts is None:
break
rt.append(ts)
if debug:
filename = []
for field in cur.description:
filename.append(field[0])
print(filename)
# print('\033[33m Query Success!!! \033[5m')
except Exception as err:
print(err)
print('\033[33m Exec Sql failed: "{0}" \033[5m'.format(sql))
finally:
self.DelCursor(cur)
return rt def Exec(self, sql):
"""
对数据库执行增、删、改操作,传入需要执行的SQL语句
:param sql:
:return:
"""
rt = None
cur = self.NewCursor()
if not cur:
return rt
try:
cur.execute(sql)
# print('\033[33m update/delete/insert Success! \033[5m')
self._conn.commit()
except Exception as err:
print('\033[33m Exec Sql Failed: "{0}" \033[5m'.format(sql))
print(err)
cur.rollback()
finally:
self.DelCursor(cur)
return rt