SQLAlchemy 是一个用 Python 实现的 ORM (Object Relational Mapping)框架,它由多个组件构成,这些组件可以单独使用,也能独立使用。它的组件层次结构如下:
其中最常用的组件,应该是 ORM 和 SQL 表达式语言,这两者既可以独立使用,也能结合使用。
ORM 的好处在于它
- 自动处理了数据库和 Python 对象之间的映射关系,屏蔽了两套系统之间的差异。程序员不需要再编写复杂的 SQL 语句,直接操作 Python 对象就行。
- 屏蔽了各数据库之间的差异,更换底层数据库不需要修改 SQL 语句,改下配置就行。
- 使数据库结构文档化,models 定义很清晰地描述了数据库的结构。
- 避免了不规范、冗余、风格不统一的 SQL 语句,可以避免很多人为 Bug,也方便维护。
但是 ORM 需要消耗额外的性能来处理对象关系映射,此外用 ORM 做多表关联查询或复杂 SQL 查询时,效率低下。因此它适用于场景不太复杂,性能要求不太苛刻的场景。
都说 ORM 学习成本高,我自己也更倾向于直接使用 SQL 语句(毕竟更熟悉),因此这一篇笔记不涉及 ORM 部分,只记录 SQLAlchemy 的 Engine 与 SQL 表达式语言。
一、直接使用 Engine 和 Connections
第一步是创建数据库引擎实例:
from sqlalchemy import create_engine
engine = create_engine('sqlite:///:memory:',
echo=True, # echo=True 表示打印出自动生成的 SQL 语句(通过 logging)
pool_size=5, # 连接池容量,默认为 5,生产环境下太小,需要修改。
# 下面是 connection 回收的时间限制,默认 -1 不回收
pool_recyle=7200) # 超过 2 小时就重新连接(MySQL 默认的连接最大闲置时间为 8 小时)
create_engine
接受的第一个参数是数据库 URI,格式为 dialect[+driver]://user:password@host/dbname[?key=value..]
,dialect 是具体的数据库名称,driver 是驱动名称。key-value 是可选的参数。举例:
# PostgreSQL
postgresql+psycopg2://scott:tiger@localhost/dbtest
# MySQL + PyMySQL(或者用更快的 mysqlclient)
mysql+pymysql://scott:tiger@localhost/dbtest
# sqlite 内存数据库
# 注意 sqlite 要用三个斜杠,表示不存在 hostname,sqlite://<nohostname>/<path>
sqlite:///:memory:
# sqlite 文件数据库
# 四个斜杠是因为文件的绝对路径以 / 开头:/home/ryan/Codes/Python/dbtest.db
sqlite:////home/ryan/Codes/Python/dbtest.db
# SQL Server + pyodbc
# 首选基于 dsn 的连接,dsn 的配置请搜索hhh
mssql+pyodbc://scott:tiger@some_dsn
引擎创建后,我们就可以直接获取 connection,然后执行 SQL 语句了。这种用法相当于把 SQLAlchemy 当成带 log 的数据库连接池使用:
with engine.connect() as conn:
res = conn.execute("select username from users") # 无参直接使用
# 使用问号作占位符,前提是下层的 DBAPI 支持。更好的方式是使用 text(),这个后面说
conn.execute("INSERT INTO table (id, value) VALUES (?, ?)", 1, "v1") # 参数不需要包装成元组
# 查询返回的是 ResultProxy 对象,有和 dbapi 相同的 fetchone()、fetchall()、first() 等方法,还有一些拓展方法
for row in result:
print("username:", row['username'])
但是要注意的是,connection 的 execute 是自动提交的(autocommit),这就像在 shell 里打开一个数据库客户端一样,分号结尾的 SQL 会被自动提交。
只有在 BEGIN TRANSACTION
内部,AUTOCOMMIT
会被临时设置为 FALSE
,可以通过如下方法开始一个内部事务:
def transaction_a(connection):
trans = connection.begin() # 开启一个 transaction
try:
# do sthings
trans.commit() # 这里需要手动提交
except:
trans.rollback() # 出现异常则 rollback
raise
# do other things
with engine.connect() as conn:
transaction_a(conn)
1. 使用 text() 构建 SQL
相比直接使用 string,text() 的优势在于它:
- 提供了统一的参数绑定语法,与具体的 DBAPI 无关。
# 1. 参数绑定语法
from sqlalchemy import text
result = connection.execute(
# 使用 :key 做占位符
text('select * from table where id < :id and typeName=:type'),
{'id': 2,'type':'USER_TABLE'}) # 用 dict 传参数,更易读
# 2. 参数类型指定
from sqlalchemy import DateTime
date_param=datetime.today()+timedelta(days=-1*10)
sql="delete from caw_job_alarm_log where alarm_time<:alarm_time_param"
# bindparams 是 bindparam 的列表,bindparam 则提供参数的一些额外信息(类型、值、限制等)
t=text(sql, bindparams=[bindparam('alarm_time_param', type_=DateTime, required=True)])
connection.execute(t, {"alarm_time_param": date_param})
- 可以很方便地转换 Result 中列的类型
stmt = text("SELECT * FROM table",
# 使用 typemap 指定将 id 列映射为 Integer 类型,name 映射为 String 类型
typemap={'id': Integer, 'name': String},
)
二、SQL 表达式语言
SQLAlchemy 表达式语言是一个使用 Python 结构表示关系数据库结构和表达式的系统。
1. 定义并创建表
SQL 表达式语言使用 Table 来定义表,而表的列则用 Column 定义。Column 总是关联到一个 Table 对象上。
一组 Table 对象以及它们的子对象的集合就被称作「数据库元数据(database metadata)」。metadata 就像你的网页分类收藏夹,相关的 Table 放在一个 metadata 中。
下面是创建元数据(一组相关联的表)的例子,:
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey
metadata = MetaData() # 先创建元数据(收藏夹)
users = Table('user', metadata, # 创建 user 表,并放到 metadata 中
Column('id', Integer, primary_key=True),
Column('name', String),
Column('fullname', String)
)
addresses = Table('address', metadata,
Column('id', Integer, primary_key=True),
Column('user_id', None, ForeignKey('user.id')), # 外键,引用 user 表的 id 列
Column('email_address', String, nullable=False)
)
metadata.create_all(engine) # 使用 engine 创建 metadata 内的所有 Tables(会检测表是否已经存在,所以可以重复调用)
2. 增删改语句
- 增:
# 方法一,使用 values 传参
ins = users.insert().values(name="Jack", fullname="Jack Jones") # 可以通过 str(ins) 查看自动生成的 sql
connection.execute(ins)
# 方法二,参数传递给 execute()
conn.execute(users.insert(), id=2, name='wendy', fullname='Wendy Williams')
# 方法三,批量 INSERT,相当于 executemany
conn.execute(addresses.insert(), [ # 插入到 addresses 表
{'user_id': 1, 'email_address': '[email protected]'}, # 传入 dict 列表
{'user_id': 1, 'email_address': '[email protected]'},
{'user_id': 2, 'email_address': '[email protected]'},
{'user_id': 2, 'email_address': '[email protected]'}
])
# 此外,通过使用 bindparam,INSERT 还可以执行更复杂的操作
stmt = users.insert() \
.values(name=bindparam('_name') + " .. name") # string 拼接
conn.execute(stmt, [
{'id':4, '_name':'name1'},
{'id':5, '_name':'name2'},
{'id':6, '_name':'name3'},
])
- 删:
_table.delete() \
.where(_table.c.f1==value1) \
.where(_table.c.f2==value2) # where 指定条件
- 改:
# 举例
stmt = users.update() \
.where(users.c.name == 'jack') \
.values(name='tom')
conn.execute(stmt)
# 批量更新
stmt = users.update().\
where(users.c.name == bindparam('oldname')).\
values(name=bindparam('newname'))
conn.execute(stmt, [
{'oldname':'jack', 'newname':'ed'},
{'oldname':'wendy', 'newname':'mary'},
{'oldname':'jim', 'newname':'jake'},
])
可以看到,所有的条件都是通过 where
指定的,它和后面 ORM 的 filter 接受的参数是一样的。(详细的会在第二篇文章里讲)
查询暂时略过(因为感觉手写 sql 更方便。。)