其实在2.0风格中, 主要受到影响的是ORM的查询方式, 详情见文档: 2.0 Migration - ORM Usage
安装
pip install sqlalchemy
检测sqlalchemy
版本:
>>>import sqlalchemy
>>>sqlalchemy.__version__
'1.4.27'
使用步骤
一般来说SQLAlchemy
的使用方式有两种: Core
和ORM
两种有什么不同呢?
ORM
是构建在Core
之上的Core
更加底层, 可以执行直接执行SQL语句ORM
类似于Django的ORM, 由于sqlalchemy提供了一套接口, 所以不需要我们直接写SQL语句 (1.x版本)- 至于要用哪个, 等到你用到时, 你会知道的
组件依赖关系图:
Core
一般来说, 使用步骤如下:
- 配置数据库连接
- 建立连接
- 创建表
- 执行SQL语句, 按需开启事件是否自动提交
- 拿到返回数据, 执行其他代码
数据库的连接的格式
我们在创建引擎(连接)时, 需要指定数据库的URL, URL格式, 见: Engine Configuration
, 总的来说, 格式就是: dialect[+driver]://user:password@host/dbname[?key=value..]
dialect
数据库名称(方言): 如mysqldriver
连接数据库的库: 如: pymysqluser
用户名password
密码host
地址dbname
数据库名称key=value
指的是给数据库的参数
如下面的URL:
mysql+pymysql://root:[email protected]:3306/test_db?charset=utf8
建立连接
调用sqlalchemy.create_engine
方法, 为了兼容2.0风格的接口, 可以加上future
参数. 至于什么是2.0风格的接口, 可以看看官方文档: 2.0 stylecreate_engine
有几个参数需要我们注意:
url
即数据库url, 其格式见上文: 数据库的连接的格式echo
参数为True
时, 将会将engine的SQL记录到日志中 ( 默认输出到标准输出)echo_pool
为True
时,会将连接池的记录信息输出future
使用2.0样式Engine
和Connection API
更多参数见官方文档: sqlalchemy.create_engine
例子
from sqlalchemy import create_engine
# 兼容2.0的写法
# 返回对象不一样
engine1 = create_engine("sqlite+pysqlite:///:memory:", echo=True, future=True)
print(type(engine1))
# <class 'sqlalchemy.future.engine.Engine'>
engine2 = create_engine("sqlite+pysqlite:///:memory:", echo=True)
print(type(engine2))
# <class 'sqlalchemy.engine.base.Engine'>
创建表
我们想要让数据库创建一个表, 需要利用MetaData
对象, 关于一些常用的MetaData
方法, 见: MetaData
除了要MetaData
对象外, 我们还需要Table
对象, 用于定义一个表的结构Table
的一般使用
mytable = Table("mytable", metadata,
Column('mytable_id', Integer, primary_key=True),
Column('value', String(50))
)
Table
的参数:
name
表名称metadata
该表所属的MetaData对象- 其他参数: 通过
Column
指定一列数据, 格式见: Column定义
例子:
from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, text
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
metadata_obj = MetaData()
user_table = Table(
"user_account",
metadata_obj,
Column('id', Integer, primary_key=True),
Column("username", String(30))) # String也可以不实例化
# 第二个表
address_table = Table(
"address",
metadata_obj,
Column("id", Integer, primary_key=True),
# 定义外键
Column("uid", ForeignKey("user_account.id"), nullable=False),
Column('email_address', String(32), nullable=False)
)
# 相当于执行 CREATE TABLE 语句
metadata_obj.create_all(engine)
"""
-- 相当于:
CREATE TABLE user_account (
id INTEGER NOT NULL AUTO_INCREMENT,
username VARCHAR(30),
PRIMARY KEY (id)
);
CREATE TABLE address (
id INTEGER NOT NULL AUTO_INCREMENT,
uid INTEGER NOT NULL,
email_address VARCHAR(32) NOT NULL,
PRIMARY KEY (id),
FOREIGN KEY(uid) REFERENCES user_account (id)
)
"""
Table
的一些属性
# ---------- 访问所有列
# .c => Column
print(user_table.c.keys())
# ['id', 'username']
# ---------- 访问某一列
print(repr(user_table.c.username))
# Column('username', String(length=30), table=<user>)
# ---------- 返回主键
print(user_table.primary_key)
# 隐式生成
# PrimaryKeyConstraint(Column('id', Integer(), table=<user>, primary_key=True, nullable=False))
在事务中执行SQL
通常, 我们通过调用engine.connect
和engine.begin
方法开始一个事件sqlalchemy
使用事务有两种风格commit as you go
和Begin once
, 前者需要我们手动提交, 后者会自动提交
手动提交
engine.connect
方法符合python的上下文管理协议, 会返回一个Connection
对象, 该方法会在不手动提交的情况下回滚.举个例子:
from sqlalchemy import create_engine
from sqlalchemy import text
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
with engine.connect() as conn:
# 执行
result = conn.execute(text("select 'hello world'")) # text 可以使用SQL语句
print(result.all())
# conn.commit()
# [('hello world',)]
# 最后会ROLLBACK
上面的代码中, 相当于开启了事务, 由于最后没有调用commit
方法, 所以会回滚.
自动提交
engine.begin
方法也符合python的上下文管理协议, 只要执行时不报错就会自动提交, 报错时会回滚.
from sqlalchemy import create_engine
from sqlalchemy import text
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
with engine.begin() as conn:
result = conn.execute(text("select 'hello world'"))
print(result.all())
# [('hello world',)]
# COMMIT
绑定参数
上面在事务中执行SQL语句时, 我们用到了sqlalchemy.text
, 可以直接定义文本SQL字符串
为了避免被SQL注入, 故在需要传入参数的场景中需要根据sqlalchemy
的方式传入, 而不是直接拼接成字符串.
使用:y
的格式定义参数, 且将值以字典的形式传给execute
from sqlalchemy import create_engine
from sqlalchemy import text
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
with engine.begin() as conn:
result = conn.execute(text("select name from userinfo where name like :y"), {"y": "lcz%"})
print(result.all())
# [('lczmx',)]
# COMMIT
多个参数时, 可以这样
with engine.connect() as conn:
conn.execute(
text("INSERT INTO userinfo (id, name) VALUES (:x, :y)"),
[{"x": 1, "y": "lcmx"}, {"x": 2, "y": "xxx"}])
conn.commit()
这种方式也可以
stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)
with engine.connect() as conn:
conn.execute(stmt)
conn.commit()
增删改查
处理使用text
直接执行SQL外, 你还可以使用其他语法增删改查数据
假如表结构如下:
$show create table address;
+---------+-----------------------------------------+
| Table | Create Table |
+---------+-----------------------------------------+
| address | CREATE TABLE `address` (
`id` int NOT NULL AUTO_INCREMENT,
`uid` int NOT NULL,
`email_address` varchar(32) NOT NULL,
PRIMARY KEY (`id`),
KEY `uid` (`uid`),
CONSTRAINT `address_ibfk_1` FOREIGN KEY (`uid`) REFERENCES `user_account` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=gbk |
+---------+------------------------------------------+
$show create table user_account;
+--------------+------------------------------------+
| Table | Create Table |
+--------------+------------------------------------+
| user_account | CREATE TABLE `user_account` (
`id` int NOT NULL AUTO_INCREMENT,
`username` varchar(30) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=gbk |
+--------------+-------------------------------------+
1 row in set (0.00 sec)
插入数据
使用insert(...).values(...)
形式为数据库插入数据
from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, insert
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".format(**DATABASE_CONFIG),
echo=True, future=True)
metadata_obj = MetaData()
user_table = Table(
"user_account",
metadata_obj,
Column('id', Integer, primary_key=True),
Column("username", String(30))) # String也可以不实例化
# 第二个表
address_table = Table(
"address",
metadata_obj,
Column("id", Integer, primary_key=True),
Column("uid", ForeignKey("user_account.id"), nullable=False),
Column('email_address', String(32), nullable=False)
)
metadata_obj.create_all(bind=engine)
with engine.connect() as conn:
# 插入一条普通数据
conn.execute(insert(user_table).values(id=1, username="lczmx"))
# 插入外键等数据
conn.execute(insert(address_table).values(uid=1, email_address="[email protected]"))
# 自动生成value, 不需要我们手动指定
conn.execute(insert(user_table),
[{"username": "张三"},
{"username": "李四"},
{"username": "王五"},
{"username": "赵六"},
])
conn.commit()
SQLAlchemy还提供了更复杂的用法, 见: Inserting Rows with Core
删除数据
使用delete(...).where(...)
的形式删除数据
目前的表数据:
select u.id as uid, u.username, a.id as aid, a.email_address as email_address
from user_account as u
left join address as a on u.id=a.uid;
+-----+----------+------+-------------------+
| uid | username | aid | email_address |
+-----+----------+------+-------------------+
| 1 | lczmx | 1 | [email protected] |
| 2 | 张三 | NULL | NULL |
| 3 | 李四 | NULL | NULL |
| 4 | 王五 | NULL | NULL |
| 5 | 赵六 | NULL | NULL |
+-----+----------+------+-------------------+
例子:
from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, delete
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".format(**DATABASE_CONFIG),
echo=True, future=True)
metadata_obj = MetaData()
user_table = Table(
"user_account",
metadata_obj,
Column('id', Integer, primary_key=True),
Column("username", String(30))) # String也可以不实例化
# 第二个表
address_table = Table(
"address",
metadata_obj,
Column("id", Integer, primary_key=True),
Column("uid", ForeignKey("user_account.id"), nullable=False),
Column('email_address', String(32), nullable=False)
)
metadata_obj.create_all(bind=engine)
with engine.connect() as conn:
# 一般删除
# user_table.c 获取的是 列数据
result1 = conn.execute(delete(user_table).where(user_table.c.id == 3))
print(f"受影响行数: {result1.rowcount}") # 受影响行数: 1
# and 删除
result2 = conn.execute(delete(user_table).where(user_table.c.username == "张三", user_table.c.id == 2))
print(f"受影响行数: {result2.rowcount}") # 受影响行数: 1
conn.commit()
更多见: The delete() SQL Expression Construct
更新数据
使用update(...).where(...).values(...)
的形式更新数据
select u.id as uid, u.username, a.id as aid, a.email_address as email_address
from user_account as u
left join address as a on u.id=a.uid;
+-----+----------+------+-------------------+
| uid | username | aid | email_address |
+-----+----------+------+-------------------+
| 1 | lczmx | 1 | [email protected] |
| 2 | 张三 | NULL | NULL |
| 3 | 李四 | NULL | NULL |
| 4 | 王五 | NULL | NULL |
| 5 | 赵六 | NULL | NULL |
+-----+----------+------+-------------------+
例子:
from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, update, bindparam, select
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".
format(**DATABASE_CONFIG), echo=True, future=True)
metadata_obj = MetaData()
user_table = Table(
"user_account",
metadata_obj,
Column('id', Integer, primary_key=True),
Column("username", String(30))) # String也可以不实例化
# 第二个表
address_table = Table(
"address",
metadata_obj,
Column("id", Integer, primary_key=True),
Column("uid", ForeignKey("user_account.id"), nullable=False),
Column('email_address', String(32), nullable=False)
)
metadata_obj.create_all(bind=engine)
with engine.connect() as conn:
# 一般更新
result1 = conn.execute(update(user_table).where(
user_table.c.username == "王五").values(username="王老五"))
print(f"受影响行数: {result1.rowcount}") # 受影响行数: 1
# 更新数据 加上 原来的数据
result2 = conn.execute(
update(user_table).where(user_table.c.username == "赵六").values(
username=user_table.c.username + "一号"))
print(f"受影响行数: {result2.rowcount}") # 受影响行数: 1
# 以字典的形式, 替换更新多个值
result3 = conn.execute(
update(user_table).where(user_table.c.username == bindparam('old_name')).values(
username=bindparam('new_name')),
[
{"old_name": "张三", "new_name": "新张三"},
{"old_name": "李四", "new_name": "新李四"},
]
)
print(f"受影响行数: {result3.rowcount}") # 受影响行数: 2
# 以 子查询 的方式 更新数据
scalar_subq = (
select(address_table.c.email_address).
where(address_table.c.uid == user_table.c.id).
order_by(address_table.c.id).
limit(1).
scalar_subquery()
)
# 将email_address的值 赋给 username
update(user_table).values(username=scalar_subq)
"""
-- 以上查询, 相当于:
UPDATE user_account SET username=(SELECT address.email_address
FROM address
WHERE address.uid = user_account.id ORDER BY address.id
LIMIT :param_1)
"""
conn.commit()
修改后的结果:
+-----+----------+------+-------------------+
| uid | username | aid | email_address |
+-----+----------+------+-------------------+
| 1 | lczmx | 1 | [email protected] |
| 2 | 新张三 | NULL | NULL |
| 3 | 新李四 | NULL | NULL |
| 4 | 王老五 | NULL | NULL |
| 5 | 赵六一号 | NULL | NULL |
+-----+----------+------+-------------------+
更多见: Updating and Deleting Rows with Core
查询数据
由于2.0的查询方式, Core和ORM都可以使用, 所以放在一起, 见下文: 查询数据详解
处理查询返回的数据
我们执行conn.execute
方法的结果为: CursorResult
对象
其本质上是继承与Result
对象, 其使用方式见: Result
例子:
假如查询的表:
mysql> select * from user_account;
+----+----------+
| id | username |
+----+----------+
| 9 | lczmx |
| 10 | jack |
| 11 | tom |
| 12 | mike |
+----+----------+
4 rows in set (0.00 sec)
mysql>
利用SQLAlchemy获取数据:
from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, text
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
with engine.connect() as conn:
# 执行
result = conn.execute(text("select * from user_account;"))
for row in result.all():
# 使用f-strings 格式化字符串
print(f"id: {row.id:3}, username: {row.username:20}")
# 打印的结果:
"""
id: 9, username: lczmx
id: 10, username: jack
id: 11, username: tom
id: 12, username: mike
"""
conn.commit()
ORM
和Core一样, ORM也有一定的使用步骤:
- 配置数据库连接, 见上文: 数据库的连接的格式
- 创建会话
- 创建表
- 使用接口, 增删改查数据
- 拿到返回数据, 执行其他代码
在学习SQLAlcehmy的ORM之前, 建议先了解一些概念, 以免后面会混淆
会话
Session
会话是SQLAlchemy ORM与数据库的交互对象
它可以管理建立连接中engine
, 并为通过会话加载或与会话关联的对象提供标识映射 (identity map
)
在使用时与Connection
非常相似, 你可以对比着使用Base
通过sqlalchemy.orm.declarative_base
创建
作为定义表的基类, 内部有包含MetaData
对象
可以类似于Django
一样定义表
在SQLAlchemy
中, session
是一个连接池, 的由其管理, 因此, 假如我们需要操作数据库的话, 需要在session
中拿到Connection
(连接)
创建会话
SQLAlchemy
提供了两种创建会话的方法:
sqlalchemy.orm.Session
from sqlalchemy import create_engine from sqlalchemy.orm import Session # 创建引擎 engine = create_engine('postgresql://scott:tiger@localhost/') # 创建会话 # 以下with可以简写成 with Session(engine) as session, session.begin(): with Session(engine) as session: # 开启自动提交 with session.begin(): # add方法 会将some_object 保存到数据库 # session.add(some_object) # session.add(some_other_object) pass
sqlalchemy.orm.sessionmaker
from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker # 创建引擎 engine = create_engine('postgresql://scott:tiger@localhost/') # 创建session Session = sessionmaker(engine) # 一般使用 with Session() as session: # session.add(some_object) # session.add(some_other_object) # 提交 session.commit() # 自动提交 with Session.begin() as session: # session.add(some_object) # session.add(some_other_object) pass
虽然有两种方法创建会话, 但我们一般使用sessionmaker
创建会话
另外补充一下session
的其它使用方式:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
engine = create_engine('postgresql://scott:tiger@localhost/')
Session = sessionmaker(engine)
# 从连接指定到session
with engine.connect() as connection:
with Session(bind=connection) as session:
# 一些操作
pass
下面列出session
的一些常用方法, 增删改查数据时要用到
例子:
from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
stmt = text("SELECT id, name FROM userinfo WHERE id > :y").bindparams(y=1)
with Session(engine) as session:
result = session.execute(stmt)
print(result.all())
# [(2, 'name2'), (3, 'name2')]
# ROLLBACK
在ORM中创建表
使用ORM时, 我们也需要MetaData
, 不同的是, 我们是通过sqlalchemy.orm.registry
构造的. 而且, 我们不需要像Core
那样直接声明Table
, 而是继承某个公共基类 (Base
), 添加属性即可. 有两种方式定义基类.
方式一:
from sqlalchemy.orm import registry
mapper_registry = registry()
print(mapper_registry.metadata) # MetaData对象
# 公共基类
Base = mapper_registry.generate_base()
方法二:
from sqlalchemy.orm import declarative_base
# 内部 return registry(...).generate_base(...)
Base = declarative_base()
现在你可以像在Django ORM
中一样, 定义表并在数据库中创建表, 每一个Column
表示一列数据, 关于Column
的写法, 见: Column定义
from sqlalchemy import Column, String, Integer, create_engine, SMALLINT, Boolean, ForeignKey
from sqlalchemy.orm import relationship, declarative_base, sessionmaker
# 导入公共基类
Base = declarative_base()
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
class Student(Base):
__tablename__ = "student"
sid = Column("sid", Integer, primary_key=True)
name = Column("name", String(32), nullable=False, index=True, comment="姓名")
age = Column("age", SMALLINT, nullable=False, comment="年龄")
gender = Column("gender", Boolean, nullable=False, comment="性别, True: 男, False: 女")
class Course(Base):
__tablename__ = "course"
cid = Column("cid", Integer, primary_key=True)
name = Column("name", String(10), nullable=False, comment="科目名")
tid = Column("tid", ForeignKey("teacher.tid"), comment="课程教师")
class Teacher(Base):
__tablename__ = "teacher"
tid = Column("tid", Integer, primary_key=True)
name = Column("name", String(10), nullable=False, comment="教师名")
class Score(Base):
__tablename__ = "score"
sid = Column("sid", Integer, primary_key=True)
score = Column("score", SMALLINT, nullable=False, comment="成绩")
student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")
Base.metadata.create_all(bind=engine)
"""
-- 对于sql
CREATE TABLE student (
sid INTEGER NOT NULL AUTO_INCREMENT,
name VARCHAR(32) NOT NULL COMMENT '姓名',
age SMALLINT NOT NULL COMMENT '年龄',
gender BOOL NOT NULL COMMENT '性别, True: 男, False: 女',
PRIMARY KEY (sid)
)
CREATE TABLE teacher (
tid INTEGER NOT NULL AUTO_INCREMENT,
name VARCHAR(10) NOT NULL COMMENT '教师名',
PRIMARY KEY (tid)
)
CREATE TABLE course (
cid INTEGER NOT NULL AUTO_INCREMENT,
name VARCHAR(10) NOT NULL COMMENT '科目名',
tid INTEGER COMMENT '课程教师',
PRIMARY KEY (cid),
FOREIGN KEY(tid) REFERENCES teacher (tid)
)
CREATE TABLE score (
sid INTEGER NOT NULL AUTO_INCREMENT,
score SMALLINT NOT NULL COMMENT '成绩',
student_id INTEGER COMMENT '成绩所属学生',
course_id INTEGER COMMENT '成绩所属科目',
PRIMARY KEY (sid),
FOREIGN KEY(student_id) REFERENCES student (sid),
FOREIGN KEY(course_id) REFERENCES course (cid)
)
"""
增删改查数据
插入数据
接上文 "在ORM中创建表" 中的表
1.x的接口与2.0的接口一样, 都是调用session.add(instance)
方法添加到数据库 (add
方法下次刷新操作时, 将instance
保存到数据库)
注意: 自动生成的数据, 在未插入到数据库之前, 都为None
, 如: 自动生成的主键
from sqlalchemy import Column, String, Integer, create_engine, SMALLINT, Boolean, ForeignKey
from sqlalchemy.orm import relationship, declarative_base, sessionmaker
from sqlalchemy.orm import Session
from typing import Any
# 导入公共基类
Base = declarative_base()
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
class Student(Base):
__tablename__ = "student"
sid = Column("sid", Integer, primary_key=True)
name = Column("name", String(32), nullable=False, index=True, comment="姓名")
age = Column("age", SMALLINT, nullable=False, comment="年龄")
gender = Column("gender", Boolean, nullable=False, comment="性别, True: 男, False: 女")
class Course(Base):
__tablename__ = "course"
cid = Column("cid", Integer, primary_key=True)
name = Column("name", String(10), nullable=False, comment="科目名")
tid = Column("tid", ForeignKey("teacher.tid"), comment="课程教师")
class Teacher(Base):
__tablename__ = "teacher"
tid = Column("tid", Integer, primary_key=True)
name = Column("name", String(10), nullable=False, comment="教师名")
class Score(Base):
__tablename__ = "score"
sid = Column("sid", Integer, primary_key=True)
score = Column("score", SMALLINT, nullable=False, comment="成绩")
student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")
Base.metadata.create_all(bind=engine)
SessionLocal = sessionmaker(bind=engine)
# 一般将 添加到数据库 封装成一个函数
def create_data(db: Session, target_cls: Any, **kwargs):
try:
cls_obj = target_cls(**kwargs)
# 添加一个
db.add(cls_obj)
# 添加多个:
# db.add_all([obj1, obj2, ...])
db.commit()
# 手动将 数据 刷新到数据库
db.refresh(cls_obj)
return cls_obj
except Exception as e:
# 别忘记发生错误时回滚
db.rollback()
raise e
session = SessionLocal()
# -------------- 创建学生数据
student = create_data(session, Student, sid=1, name="张三", age=22, gender=True)
# -------------- 创建教师数据
teacher = create_data(session, Teacher, tid=1, name="语文老师")
# -------------- 创建课程数据
course = create_data(session, Course, cid=1, name="语文", tid=teacher.tid)
# -------------- 创建成绩数据
score = create_data(session, Score, sid=1, score=89, student_id=student.sid, course_id=course.cid)
总的来说, 插入数据代码一般为:
# 1. 实例化一个表类
db_city = CityTable(....)
# 2. 调用session的add方法
session.add(db_city)
# 3. 调用session的commit方法 提交事务
session.commit()
# 4. 手动调用session的refresh方法 将数据刷新到数据库
session.refresh(db_city)
删除数据
1.x的方法
主要步骤是先查询再删除, 一般形式为: session.query(...).filter(...).delete()
from sqlalchemy import Column, Integer, create_engine, SMALLINT, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker
# 导入公共基类
Base = declarative_base()
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
class Score(Base):
__tablename__ = "score"
sid = Column("sid", Integer, primary_key=True)
score = Column("score", SMALLINT, nullable=False, comment="成绩")
student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")
Base.metadata.create_all(bind=engine)
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
# 方法一: 调用 session.delete 方法
s = session.query(Score).filter(Score.score == 59).first()
session.delete(s)
# 方法二: 查询后直接删除
session.query(Score).filter(Score.score == 59).delete()
session.commit()
2.0的方法
像Core一样删除数据, 即delte(...).where(...)
from sqlalchemy import Column, Integer, create_engine, SMALLINT, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import select, delete
# 导入公共基类
Base = declarative_base()
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
class Score(Base):
__tablename__ = "score"
sid = Column("sid", Integer, primary_key=True)
score = Column("score", SMALLINT, nullable=False, comment="成绩")
student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")
Base.metadata.create_all(bind=engine)
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
session.execute(
delete(Score).where(Score.sid == 1)
)
session.commit()
修改数据
1.x的方法
主要步骤是先查询再更新, 即: session.query(...).filter(...).update(...)
from sqlalchemy import Column, Integer, create_engine, SMALLINT, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker
# 导入公共基类
Base = declarative_base()
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
class Score(Base):
__tablename__ = "score"
sid = Column("sid", Integer, primary_key=True)
score = Column("score", SMALLINT, nullable=False, comment="成绩")
student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")
Base.metadata.create_all(bind=engine)
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
row = session.query(Score).filter(Score.score == 59).update({"score": 60})
print(f"修改的行数: {row}")
session.commit()
2.0的方法
同样和Core一样, 使用update(...).where(...).values(...)
的形式更新数据
from sqlalchemy import Column, Integer, create_engine, SMALLINT, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import update, bindparam
# 导入公共基类
Base = declarative_base()
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
class Score(Base):
__tablename__ = "score"
sid = Column("sid", Integer, primary_key=True)
score = Column("score", SMALLINT, nullable=False, comment="成绩")
student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")
Base.metadata.create_all(bind=engine)
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
# 一般更新
result1 = session.execute(update(Score).where(Score.score == 59).values(score=60))
print(f"受影响行数: {result1.rowcount}")
# # 更新数据 加上 原来的数据
result2 = session.execute(
update(Score).where(Score.score == 59).values(score=Score.score + 1))
print(f"受影响行数: {result2.rowcount}") # 受影响行数: 1
# 以字典的形式, 替换更新多个值
result3 = session.execute(
update(Score).where(Score.score == bindparam('old_score')).values(score=bindparam('new_score')),
[
{"old_score": 59, "new_score": 60},
]
)
print(f"受影响行数: {result3.rowcount}")
session.commit()
查询数据
1.x的方法
在SQLAlchemy1.x查询方式中, 使用Query
对象进行查询, 类似于Django ORM
的管理器, 可以较为简单地查询数据
假如要查询的表如下:
class User(Base):
__tablename__ = "User" # 设置表名
uid = Column(Integer, primary_key=True)
username = Column(String(80), unique=True)
email = Column(String(120), unique=True)
tags = Column(String(120))
def __repr__(self):
return '<User %r>' % self.username
表的数据:
mysql> select * from User;
+-----+----------+-------------------+------+
| uid | username | email | tags |
+-----+----------+-------------------+------+
| 1 | 张三 | [email protected] | 热情 |
| 2 | 李四 | [email protected] | 热情 |
| 3 | 王五 | [email protected] | 开朗 |
| 4 | lczmx | [email protected] | 热情 |
+-----+----------+-------------------+------+
4 rows in set (0.10 sec)
mysql>
使用例子:
from sqlalchemy import Column, Integer, create_engine, String
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import not_, or_, desc
# 导入公共基类
Base = declarative_base()
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
class User(Base):
__tablename__ = "User" # 设置表名
uid = Column(Integer, primary_key=True)
username = Column(String(80), unique=True)
email = Column(String(120), unique=True)
tags = Column(String(120))
def __repr__(self):
return '<User %r>' % self.username
Base.metadata.create_all(bind=engine)
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
# ------------------ 查询所有User数据
session.query(User).all()
"""
对应SQL
SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
`User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
"""
# ------------------ 查询有多少条数据
session.query(User).count()
"""
对应SQL
SELECT count(*) AS count_1 FROM (SELECT `User`.uid AS `User_uid`,
`User`.username AS `User_username`, `User`.email AS `User_email`,
`User`.tags AS `User_tags` FROM `User`) AS anon_1
"""
# ------------------ 查询第1条数据
session.query(User).first()
"""
对应SQL
SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
`User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
LIMIT 1
"""
# ------------------ 根据主键查询
session.query(User).get(1)
"""
对应SQL
SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
`User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
WHERE `User`.uid = 1
"""
# ------------------ 简单查询, 使用 关键字实参 的形式来设置字段名
session.query(User).filter_by(uid=1).all()
"""
对应SQL
SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
`User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
WHERE `User`.uid = 1
"""
# ------------------ 复杂查询, 可以多个表一起,使用 恒等式'==' 等形式 来设置条件
session.query(User).filter(User.uid == 1).all()
"""
对应SQL
SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
`User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
WHERE `User`.uid = 1
"""
# ------------------ filter 查询开头
session.query(User).filter(User.username.startswith("l")).all()
"""
对应SQL
SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
`User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
WHERE (`User`.username LIKE concat('l', '%%'))
"""
# ------------------ filter 查询结尾
session.query(User).filter(User.username.endswith("x")).all()
"""
对应SQL
SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
`User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
WHERE (`User`.username LIKE concat('%%', 'x'))
"""
# ------------------ filter 查询是否包含
session.query(User).filter(User.username.contains("lcz")).all()
"""
对应SQL
SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
`User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
WHERE (`User`.username LIKE concat(concat('%%', "lcz", '%%')))
"""
# ------------------ filter 模糊查询
session.query(User).filter(User.username.like("%cz%")).all()
"""
对应SQL
SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
`User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
WHERE `User`.username LIKE "%cz%"
"""
# ------------------ filter 条件取反 (not)
session.query(User).filter(not_(User.username == "lczmx")).all()
"""
对应SQL
SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
`User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
WHERE `User`.username != "lczmx"
"""
# ------------------ filter条件 或 (or), 默认为and
session.query(User).filter(
or_(User.uid == 1, User.uid == 3), ).all()
"""
对应SQL
SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
`User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
WHERE `User`.uid = 1 OR `User`.uid = 3
"""
# ------------------ filter条件 and or not 一起使用
session.query(User).filter(or_(User.uid == 1, User.uid == 4), User.username == "lczmx",
not_(User.email == "[email protected]")).all()
"""
对应SQL
SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
`User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
WHERE (`User`.uid = 1 OR `User`.uid = 4)
AND `User`.username = "lczmx" AND `User`.email = "[email protected]"
"""
# ------------------ filter 取反查询
session.query(User).filter(User.username != "lczmx").all()
"""
对应SQL
SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
`User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
WHERE `User`.username != "lczmx";
"""
# ------------------ 查询uid为[1, 3, 5, 7, 9]的用户
session.query(User).filter(User.uid.in_([1, 3, 5, 7, 9])).all()
"""
对应SQL
SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
`User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
WHERE `User`.uid IN (1, 3, 5, 7, 9)
"""
# ------------------ 分组查询
# !! 注意不是query(User), 因为Query(User)对应的SQL为:
# SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
# `User`.email AS `User_email`, `User`.tags AS `User_tags`
session.query(User.tags).group_by(User.tags).all()
"""
对应SQL
SELECT `User`.tags AS `User_tags` FROM `User` GROUP BY `User`.tags
"""
# ------------------ 排序 顺序
session.query(User).order_by(User.uid).all()
"""
对应SQL
SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
`User`.email AS `User_email`, `User`.tags AS `User_tags`
FROM `User` ORDER BY `User`.uid
"""
# ------------------ 排序 倒序
session.query(User).order_by(desc(User.uid)).all()
"""
对应SQL
SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
`User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
ORDER BY `User`.uid DESC
"""
# ------------------ 去重
session.query(User.tags).distinct().all()
"""
对应SQL:
SELECT DISTINCT `User`.tags AS `User_tags` FROM `User`;
"""
# ------------------ 取几条数据
session.query(User).limit(2).all()
"""
对应SQL:
SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
`User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
LIMIT 2;
"""
# ------------------ 跳过几条个数据
session.query(User).offset(1).limit(2).all()
"""
对应SQL
SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
`User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
LIMIT 1, 2;
"""
关于query返回的对象query
的返回对象为sqlalchemy.orm.query.Query
对象, 你可以与Result
对象进行对比, 主要有以下的方法:
all()
返回由表对象组成的列表first()
返回第一个结果 (表对象), 内部执行limit
SQLone()
只返回一行数据或引发异常 (无数据时抛出:sqlalchemy.exc.NoResultFound
, 多行数据时抛出:sqlalchemy.exc.MultipleResultsFound
)one_or_none()
最多返回一行数据或引发异常 (无数据时返回None
, 多行数据时抛出:sqlalchemy.exc.MultipleResultsFound
)scalar()
获取第一行的第一列数据. 如果没有要获取的行, 则返回None
, 多行数据时抛出:sqlalchemy.exc.MultipleResultsFound
2.0的方法
2.0的返回结果也是Result
对象, 关于Result
对象, 见: Result
注意: 由于2.0的查询方式, Core和ORM都可以使用, 所以放在一起, 见下文: 查询数据详解
relationship连表操作
我们自定义外键时, 一般的步骤是:
- 子表使用
字段名= Column(Integer, ForeignKey('主表名.主键'))
的格式定义 - 除此外, 还需要在主表中定义
relationship
用于子表与主表之间的跨表查询
完整例子:
from sqlalchemy import Column, Integer, create_engine, String, Text, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker, relationship
# 导入公共基类
Base = declarative_base()
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
class User(Base):
__tablename__ = 'user'
uid = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(16), nullable=False)
password = Column(String(32), nullable=False)
# Article是类名 user是反向访问的属性名称
article = relationship("Article", backref="user")
"""
article = relationship("Article", backref="user")
相当于:
class User(Base):
# Article是类名 user是反向访问的属性名称
article = relationship("Article", back_populates="user")
class Article(Base):
# User是类名 addresses是反向访问的属性名称
user = relationship("User", back_populates="addresses")
"""
def __repr__(self):
return "<User %s>" % self.username
class Article(Base):
__tablename__ = 'article'
aid = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String(36), nullable=False)
content = Column(Text, nullable=False)
author_id = Column(Integer, ForeignKey("user.uid"))
def __repr__(self):
return "<User %s>" % self.title
Base.metadata.create_all(bind=engine)
关于relationshiprelationship
在定义外键时, 有非常重要的作用, 如: 1. 跨表操作 2. 设置删除主表数据时子表的值relationship
的参数很多, 这里只列出常用的几个, 全部参数见文档: relationship
back_populates
指定反向访问的属性名称backref
快捷设置两个relationship
(设置back_populates
的话, 要设置两个表)cascade
用于控制修改数据时的选项比如:
articles = relationship("Article",cascade="save-update,delete")
order_by
子表列表的排序方式# 倒序 article = relationship("Article", backref="user", order_by="Article.aid.desc()") # 正序 article = relationship("Article", backref="user", order_by="Article.aid")
本部分包括Core与ORM的跨表增删改查操作
select `user`.uid as uid, `user`.username, `user`.password,
(select GROUP_CONCAT(`article`.title) from article
where `article`.author_id = `user`.uid) as article
from user;
+-----+----------+----------+---------------+
| uid | username | password | article |
+-----+----------+----------+---------------+
| 1 | 张三 | 12345 | C++入门,C入门 |
| 2 | 李四 | 12346 | python入门 |
+-----+----------+----------+---------------+
通过relationship双向访问
即直接通过relationship
访问主表或子表
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
# +++++++++++++++++++++++++ 子表访问主表, 返回主表对象
"""
实质上内部执行的SQL
SELECT user.uid AS user_uid, user.username AS user_username, user.password AS user_password
FROM user
WHERE user.uid = %(pk_1)s
"""
# ----------- 1.x方法
article_1x = session.query(Article).first()
print(article_1x.user) # <User 张三>
# ----------- 2.0 方法
article_20 = session.execute(select(Article)).first()
print(article_20.Article.user) # <User 张三>
# +++++++++++++++++++++++++ 主表访问子表, 返回子表列表
# 实质是 sqlalchemy.orm.collections.InstrumentedList 对象
# 是List的子类
"""
实质上内部执行的SQL
SELECT article.aid AS article_aid, article.title AS article_title,
article.content AS article_content, article.author_id AS article_author_id
FROM article
WHERE %(param_1)s = article.author_id ORDER BY article.aid
"""
# ----------- 1.x方法
user_1x = session.query(User).first()
print(user_1x.article)
# [<User C++入门>, <User C入门>]
# ----------- 2.0方法
user_20 = session.execute(select(User)).first()
print(user_20.User.article)
# [<User C++入门>, <User C入门>]
通过relationship修改关联关系
上面例子中说过, 主表.relationship字段
是InstrumentedList
对象 (类似于List
), 我们可以修改它, 然后调用commit
方法即可. 子表.relationship字段
是对应的主表, 可以修改为自己想要的, 同样调用commit
方法即可
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
# +++++++++++++++++++++++++ 子表修改属于的主表
"""
对于SQL
UPDATE article
SET author_id=%(author_id)s
WHERE article.aid = %(article_aid)s
"""
# ----------- 1.x方法
lisi = session.query(User).get(2)
article_1x = session.query(Article).first()
print(article_1x.user) # <User 张三>
article_1x.user = lisi # 改为 <User 李四>
session.commit() # 记得提交
# ----------- 2.0 方法
zhangsan = session.execute(select(User).where(User.uid == 1)).first()
article_20 = session.execute(select(Article)).first()
print(article_20.Article.user) # <User 李四>
article_20.Article.user = zhangsan.User # 改为 <User 张三>
session.commit() # 记得提交
# +++++++++++++++++++++++++ 主表修改子表列表
# ----------- 1.x方法 增加
user_1x = session.query(User).first()
# 添加一个新的子表数据
# 会在Article中插入一条新的数据
user_1x.article.append(Article(title="javascript 入门", content="console.log(hello world)"))
session.commit() # 记得提交
# ----------- 2.0 方法 移除
user_20 = session.execute(select(User)).first()
print(user_20.User.article) # [<User C++入门>, <User C入门>, <User javascript 入门>]
article_js = session.execute(select(Article).where(Article.aid == 4)).scalar()
# 从主表中移除与子表的关系
user_20.User.article.remove(article_js)
session.commit() # 记得提交
通过relationship修改子/主表数据
同样非常简单, 找到对应的类, 然后修改数据并commit
即可
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
# +++++++++++++++++++++++++ 通过子表修改对应主表的数据
# ----------- 1.x方法
article_1x = session.query(Article).first()
print(article_1x.user) # <User 张三>
article_1x.user.username = "张三二号"
session.commit() # 记得提交
# ----------- 2.0 方法
article_20 = session.execute(select(Article)).first()
print(article_20.Article.user) # <User 张三二号>
article_20.Article.user.username = "张三"
session.commit() # 记得提交
# +++++++++++++++++++++++++ 通过主表修改对应子表的数据
# ----------- 1.x方法
user_1x = session.query(User).first()
print(user_1x.article) # [<User C++入门>, <User C入门>, <User javascript 入门>]
user_1x.article[-1].title = "js入门"
session.commit() # 记得提交
# ----------- 2.0 方法
user_20 = session.execute(select(User)).scalar()
print(user_20.article) # [<User C++入门>, <User C入门>, <User js入门>]
user_20.article[-1].title = "javascript 入门"
session.commit() # 记得提交
通过relationship查询数据
正向查询, 子表利用主表的条件查询, 使用has
, 条件和普通查询的条件一样
反向查询, 主表利用子表的条件查询, 使用any
, 条件和普通查询的条件一样
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
# +++++++++++++++++++++++++ 反向查询
"""
对应的SQL
SELECT user.uid AS user_uid, user.username AS user_username, user.password AS user_password
FROM user
WHERE EXISTS (SELECT 1
FROM article
WHERE user.uid = article.author_id AND article.title LIKE 'python%')
"""
# ********* 查询有以python开头的Article的User
# ----------- 1.x方法
user_1x = session.query(User).filter(User.article.any(Article.title.like("python%")))
print(user_1x.all()) # [<User 李四>]
# ----------- 2.0方法
user_20 = session.execute(
select(User).where(User.article.any(Article.title.like("python%"))))
print(user_20.all()) # [(<User 李四>,)]
# +++++++++++++++++++++++++ 正向查询
"""
对应的SQL
SELECT article.aid AS article_aid, article.title AS article_title,
article.content AS article_content, article.author_id AS article_author_id
FROM article
WHERE EXISTS (SELECT 1
FROM user
WHERE user.uid = article.author_id AND (user.username LIKE concat(concat('%%', '四', '%%')))
"""
# ********* 查询User表中username有 四 的Article
# ----------- 1.x方法
article_1x = session.query(Article).filter(Article.user.has(User.username.contains("四")))
print(article_1x.all()) # [<User python入门>]
# ----------- 2.0方法
article_20 = session.execute(
select(Article).where(Article.user.has(User.username.contains("四"))))
print(article_20.all()) # [(<User python入门>,)]
建立多对多关系
可以通过relationship
便捷使用多对多关系
from sqlalchemy import Table, Text, Column, ForeignKey
# 第三张表
post_keywords = Table("post_keywords", Base.metadata,
Column('post_id', ForeignKey('posts.id'), primary_key=True),
Column('keyword_id', ForeignKey('keywords.id'), primary_key=True)
)
class BlogPost(Base):
__tablename__ = 'posts'
id = Column(Integer, primary_key=True)
headline = Column(String(255), nullable=False)
body = Column(Text)
# 多对多关系 BlogPost<->Keyword
keywords = relationship('Keyword',
secondary=post_keywords,
back_populates='posts')
def __init__(self, headline, body):
self.headline = headline
self.body = body
def __repr__(self):
return "BlogPost(%r, %r)" % (self.headline, self.body)
class Keyword(Base):
__tablename__ = 'keywords'
id = Column(Integer, primary_key=True)
keyword = Column(String(50), nullable=False, unique=True)
# 多对多关系 Keyword<->BlogPost
posts = relationship('BlogPost',
secondary=post_keywords,
back_populates='keywords')
def __init__(self, keyword):
self.keyword = keyword
Base.metadata.create_all(bind=engine)
添加操作例子:
from sqlalchemy import select
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
# 比如添加一篇文章, 为文章添加多个keyword
blog = BlogPost(headline="起飞!", body="我是文章的内容")
# 获取子表列表 并 append
blog.keywords.append(Keyword("新闻"))
blog.keywords.append(Keyword("热门"))
session.add(blog)
session.commit()
# ------- 添加第二篇文章
keyword1 = session.execute(
select(Keyword).filter_by(keyword="热门")
).scalar()
new_blog = BlogPost(headline="震惊!", body="我是第二篇文章的内容")
new_blog.keywords.append(keyword1)
session.add(new_blog)
session.commit()
查询操作例子:
from sqlalchemy import select
SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
# 查询所有 "热门" 文章
blog = session.execute(
select(BlogPost).where(BlogPost.keywords.any(Keyword.keyword == "热门"))
).all()
print(blog)
# [(BlogPost('起飞!', '我是文章的内容'),),
# (BlogPost('震惊!', '我是第二篇文章的内容'),)]
项目示范
一般来说, 我们使用SQLAlchemy的步骤大多相同, 下面
文件结构:
+--- database.py # 用于 初始化session 和 公共基类
+--- models.py # 定义表
+--- crud.py # 封装增删改查的方法
+--- schemas.py # 定义pydantic模型, 用于格式化已经取得的数据 [可选]
+--- main.py # 执行主逻辑
database.py
初始化session
和 公共基类:
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
# 数据库的URL
# 替换成自己的
SQLALCHEMY_DATABASE_URL = 'sqlite:///./coronavirus.sqlite3'
# 创建引擎
engine = create_engine(
# echo=True表示引擎将用repr()函数记录所有语句及其参数列表到日志
SQLALCHEMY_DATABASE_URL, encoding='utf-8', echo=True
)
# SessionLocal用于对数据的增删改查
# flush()是指发送数据库语句到数据库,但数据库不一定执行写入磁盘;commit()是指提交事务,将变更保存到数据库文件
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, expire_on_commit=True)
# 创建基本映射类, 用于创建表
Base = declarative_base(bind=engine, name='Base')
models.py
定义表结构
from sqlalchemy import Column, String, Integer, BigInteger, Date, DateTime, ForeignKey, func
from sqlalchemy.orm import relationship
# 导入公共基类
from .database import Base
class City(Base):
__tablename__ = 'city'
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
province = Column(String(100), unique=True, nullable=False, comment='省/直辖市')
country = Column(String(100), nullable=False, comment='国家')
country_code = Column(String(100), nullable=False, comment='国家代码')
country_population = Column(BigInteger, nullable=False, comment='国家人口')
data = relationship('Data', back_populates='city') # 'Data'是关联的类名;back_populates来指定反向访问的属性名称
created_at = Column(DateTime, server_default=func.now(), comment='创建时间')
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='更新时间')
__mapper_args__ = {"order_by": country_code} # 默认是正序,倒序加上.desc()方法
def __repr__(self):
return f'{self.country}_{self.province}'
class Data(Base):
__tablename__ = 'data'
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
city_id = Column(Integer, ForeignKey('city.id'), comment='所属省/直辖市') # ForeignKey里的字符串格式不是类名.属性名,而是表名.字段名
date = Column(Date, nullable=False, comment='数据日期')
confirmed = Column(BigInteger, default=0, nullable=False, comment='确诊数量')
deaths = Column(BigInteger, default=0, nullable=False, comment='死亡数量')
recovered = Column(BigInteger, default=0, nullable=False, comment='痊愈数量')
city = relationship('City', back_populates='data') # 'City'是关联的类名;back_populates来指定反向访问的属性名称
created_at = Column(DateTime, server_default=func.now(), comment='创建时间')
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='更新时间')
__mapper_args__ = {"order_by": date.desc()} # 按日期降序排列
def __repr__(self):
return f'{repr(self.date)}:确诊{self.confirmed}例'
crud.py
封装增删改查的方法:
from sqlalchemy.orm import Session
import models
import schemas
def get_city(db: Session, city_id: int):
return db.query(models.City).filter(models.City.id == city_id).first()
def get_city_by_name(db: Session, name: str):
return db.query(models.City).filter(models.City.province == name).first()
def get_cities(db: Session, skip: int = 0, limit: int = 10):
return db.query(models.City).offset(skip).limit(limit).all()
def create_city(db: Session, city: schemas.CreateCity):
"""
创建City数据
"""
db_city = models.City(**city.dict())
db.add(db_city)
db.commit()
db.refresh(db_city)
return db_city
def get_data(db: Session, city: str = None, skip: int = 0, limit: int = 10):
if city:
# 外键关联查询,这里不是像Django ORM那样Data.city.province
return db.query(models.Data).filter(models.Data.city.has(province=city))
return db.query(models.Data).offset(skip).limit(limit).all()
def create_city_data(db: Session, data: schemas.CreateData, city_id: int):
"""
创建Data数据
"""
db_data = models.Data(**data.dict(), city_id=city_id)
db.add(db_data)
db.commit()
db.refresh(db_data)
return db_data
schemas.py
定义pydantic模型, 用于格式化已经取得的数据:
from datetime import date as date_
from datetime import datetime
from pydantic import BaseModel
class CreateData(BaseModel):
date: date_
confirmed: int = 0
deaths: int = 0
recovered: int = 0
class CreateCity(BaseModel):
province: str
country: str
country_code: str
country_population: int
class ReadData(CreateData):
id: int
city_id: int
updated_at: datetime
created_at: datetime
class Config:
orm_mode = True
class ReadCity(CreateCity):
id: int
updated_at: datetime
created_at: datetime
class Config:
orm_mode = True
main.py
执行主逻辑:
from sqlalchemy.orm import Session
import crud
import schemas
from database import engine, Base, SessionLocal
from models import City, Data
# 创建表, 已经存在的将被忽略
Base.metadata.create_all(bind=engine)
db = SessionLocal()
# 调用 crud
db_city = crud.get_city_by_name(db, name="广东省")
if db_city:
raise Exception("City already registered")
# 创建数据
city = City(...)
crud.create_city(db=db, city=city)
查询数据详解
只有涉及SQL 的查询数据, 必然绕不开FROM
WHERE
SELECT
GROUP BY
HAVING
ORDER BY
LIMIT
INNER JOIN ... ON
LEFT JOIN ... ON
UNION
这些SQL查询语法, 那么它们在SQLAlchemy中是如何表示的呢? 实际上和SQL语句一样, SQLAlchemy的语法也有select
where
join
order_by
group_by
having
等 ...
注意: 这些方法在Core与ORM中都适用
刚接触可能会觉得比较复杂, 但是假如有SQL基础的话, 用起来比较简单.
要查询的表结构为:
# +++++++++++++++++++ 使用Core定义 +++++++++++++++++++
from sqlalchemy import MetaData, create_engine
from sqlalchemy import Table, Column, Integer, String, ForeignKey, Text
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".
format(**DATABASE_CONFIG), echo=True, future=True)
metadata_obj = MetaData()
user_table = Table(
"user",
metadata_obj,
Column("uid", Integer, primary_key=True, autoincrement=True),
Column("username", String(16), nullable=False),
Column("password", String(32), nullable=False),
)
article_table = Table(
'article',
metadata_obj,
Column("aid", Integer, primary_key=True, autoincrement=True),
Column("title", String(36), nullable=False),
Column("content", Text, nullable=False),
Column("author_id", Integer, ForeignKey("user.uid"))
)
metadata_obj.create_all(bind=engine)
# +++++++++++++++++++ 使用ORM定义 +++++++++++++++++++
from sqlalchemy import Column, Integer, create_engine, String, Text, ForeignKey
from sqlalchemy.orm import declarative_base, relationship
# 导入公共基类
Base = declarative_base()
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".
format(**DATABASE_CONFIG), echo=True, future=True)
class User(Base):
__tablename__ = 'user'
uid = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(16), nullable=False)
password = Column(String(32), nullable=False)
# Article是类名 user是反向访问的属性名称
article = relationship("Article", backref="user")
def __repr__(self):
return "<User %s>" % self.username
class Article(Base):
__tablename__ = 'article'
aid = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String(36), nullable=False)
content = Column(Text, nullable=False)
author_id = Column(Integer, ForeignKey("user.uid"))
def __repr__(self):
return "<User %s>" % self.title
Base.metadata.create_all(bind=engine)
表数据为:
mysql> select * from article, user where user.uid=article.author_id;
+-----+------------+--------------------+-----------+-----+----------+----------+
| aid | title | content | author_id | uid | username | password |
+-----+------------+--------------------+-----------+-----+----------+----------+
| 1 | C++入门 | c++ hello world | 1 | 1 | 张三 | 12345 |
| 2 | C入门 | c hello world | 1 | 1 | 张三 | 12345 |
| 3 | python入门 | print(hello world) | 2 | 2 | 李四 | 12346 |
+-----+------------+--------------------+-----------+-----+----------+----------+
3 rows in set (0.12 sec)
select
该函数可以指定要查询的表、列及添加别名
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
conn = engine.connect() # engine 在上面定义
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
# 1 ++++++++++++++++++ 一般查询 ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password FROM user
WHERE user.uid = 2
"""
# -------------- Core
result_core_1 = conn.execute(select(user_table).where(user_table.c.uid == 2))
print(result_core_1.all()) # [(2, '李四', '12346')]
# -------------- ORM
result_orm_1 = session.execute(select(User).where(User.uid == 2))
print(result_orm_1.all()) # [(<User 李四>,)]
# 2 ++++++++++++++++++ 查询全部列 ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
"""
# -------------- Core
result_core_2 = conn.execute(select(user_table))
print(result_core_2.all()) # [(1, '张三', '12345'), (2, '李四', '12346')]
# -------------- ORM
result_orm_2 = session.execute(select(User))
print(result_orm_2.all()) # [(<User 张三>,), (<User 李四>,)]
# 3 ++++++++++++++++++ 查询指定的列 ++++++++++++++++++
"""
对应SQL
SELECT user.username
FROM user
"""
# -------------- Core
result_core_3 = conn.execute(select(user_table.c.username))
print(result_core_3.all()) # [('张三',), ('李四',)]
# -------------- ORM
result_orm_3 = session.execute(select(User.username))
print(result_orm_3.all()) # [('张三',), ('李四',)]
# 4 ++++++++++++++++++ 两个表 联合查询 ++++++++++++++++++
"""
对应SQL
SELECT user.username, article.title
FROM user, article
WHERE user.uid = article.author_id
"""
# -------------- Core
result_core_4 = conn.execute(
select(user_table.c.username, article_table.c.title).where(
user_table.c.uid == article_table.c.author_id))
# [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
print(result_core_4.all())
# -------------- ORM
result_orm_4 = session.execute(select(User.username, Article.title).where(
User.uid == Article.author_id))
# [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
print(result_orm_4.all())
# 5 ++++++++++++++++++ 为列添加别名 ++++++++++++++++++
"""
对应SQL
SELECT user.username AS name
FROM user
WHERE user.uid = 1
"""
# -------------- Core
result_core_5 = conn.execute(
select((user_table.c.username).label("name")).where(
user_table.c.uid == 1))
# print(result_core_5.all()) # [('张三',)]
# -------------- ORM
result_orm_5 = session.execute(
select((User.username).label("name")).where(
User.uid == 1))
# print(result_orm_5.all()) # [('张三',)]
# 利用别名 取值
print(result_core_5.first().name) # 张三
print(result_orm_5.first().name) # 张三
# 6 ++++++++++++++++++ 为表添加别名 ++++++++++++++++++
"""
对应SQL
SELECT user_1.username
FROM user AS user_1
WHERE user_1.uid = 1
"""
# -------------- Core
user_table_core_alias = user_table.alias()
result_core_6 = conn.execute(
select(user_table_core_alias.c.username).where(
user_table_core_alias.c.uid == 1))
print(result_core_6.all()) # [('张三',)]
# -------------- ORM
from sqlalchemy.orm import aliased
user_table_orm_alias = aliased(User)
result_orm_6 = session.execute(
select(user_table_orm_alias.username).where(
user_table_orm_alias.uid == 1))
print(result_orm_6.all()) # [('张三',)]
# 7 ++++++++++++++++++ 与text 结合, 添加额外列 ++++++++++++++++++
"""
对应SQL
SELECT now() AS now, user.username, '自定义字符'
FROM user
"""
from sqlalchemy import literal_column, text
# literal_column表示一列数据
# text可以转化成SQL
# -------------- Core
result_core_7 = conn.execute(
select(literal_column("now()").label("now"), user_table.c.username, text("'自定义字符'")))
print(result_core_7.all())
"""
[(datetime.datetime(2022, 1, 8, 18, 54, 44), '张三', '自定义字符'),
(datetime.datetime(2022, 1, 8, 18, 54, 44), '李四', '自定义字符')]
"""
# -------------- ORM
result_orm_7 = session.execute(
select(literal_column("now()").label("now"), User.username, text("'自定义字符'")))
print(result_orm_7.all())
"""
[(datetime.datetime(2022, 1, 8, 18, 59, 42), '张三', '自定义字符'),
(datetime.datetime(2022, 1, 8, 18, 59, 42), '李四', '自定义字符')]
"""
where
过滤数据, Core
中table.c.xxx
获取行, 假如是ORM的话为:类名.属性名
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
conn = engine.connect() # engine 在上面定义
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
# 1 ++++++++++++++++++ 条件默认为and ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid < 3 AND user.uid > 1
"""
# -------------- Core
result_core_1 = conn.execute(
select(user_table).where(user_table.c.uid < 3, user_table.c.uid > 1))
print(result_core_1.all()) # [(2, '李四', '12346')]
# -------------- ORM
result_orm_1 = session.execute(select(User).where(User.uid < 3, User.uid > 1))
print(result_orm_1.all()) # [(<User 李四>,)]
# 2 ++++++++++++++++++ 修改条件为not or ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE (user.uid = 1 OR user.uid = 2) AND user.username != "李四"
"""
from sqlalchemy import or_, not_
# -------------- Core
result_core_2 = conn.execute(
select(user_table).where(
or_(user_table.c.uid == 1, user_table.c.uid == 2),
not_(user_table.c.username == "李四"),
))
print(result_core_2.all()) # [(1, '张三', '12345')]
# -------------- ORM
result_orm_2 = session.execute(select(User).where(
or_(User.uid == 1, User.uid == 2),
not_(User.username == "李四")))
print(result_orm_2.all()) # [(<User 张三>,)]
# 3 ++++++++++++++++++ startswith ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE (user.username LIKE concat('张', '%%'))
"""
# -------------- Core
result_core_3 = conn.execute(
select(user_table).where(
user_table.c.username.startswith("张")))
print(result_core_3.all()) # [(1, '张三', '12345')]
# -------------- ORM
result_orm_3 = session.execute(select(User).where(
User.username.startswith("张")))
print(result_orm_3.all()) # [(<User 张三>,)]
# 4 ++++++++++++++++++ endswith ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE (user.username LIKE concat('%%', '三'))
"""
# -------------- Core
result_core_4 = conn.execute(
select(user_table).where(
user_table.c.username.endswith("三")))
print(result_core_4.all()) # [(1, '张三', '12345')]
# -------------- ORM
result_orm_4 = session.execute(select(User).where(
User.username.endswith("三")))
print(result_orm_4.all()) # [(<User 张三>,)]
# 5 ++++++++++++++++++ endswith ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE (user.username LIKE concat(concat('%%', '三', '%%'))
"""
# -------------- Core
result_core_5 = conn.execute(
select(user_table).where(
user_table.c.username.contains("三")))
print(result_core_5.all()) # [(1, '张三', '12345')]
# -------------- ORM
result_orm_5 = session.execute(select(User).where(
User.username.contains("三")))
print(result_orm_5.all()) # [(<User 张三>,)]
# 6 ++++++++++++++++++ like ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.username LIKE '%三'
"""
# -------------- Core
result_core_6 = conn.execute(
select(user_table).where(
user_table.c.username.like("%三")))
print(result_core_6.all()) # [(1, '张三', '12345')]
# -------------- ORM
result_orm_6 = session.execute(select(User).where(
User.username.like("%三")))
print(result_orm_6.all()) # [(<User 张三>,)]
# 7 ++++++++++++++++++ in ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid IN (1, 2)
"""
# -------------- Core
result_core_7 = conn.execute(
select(user_table).where(
user_table.c.uid.in_((1, 2))
))
print(result_core_7.all()) # [(1, '张三', '12345'), (2, '李四', '12346')]
# -------------- ORM
result_orm_7 = session.execute(
select(User).where(
User.uid.in_((1, 2))
))
print(result_orm_7.all()) # [(<User 张三>,), (<User 李四>,)]
# 8 ++++++++++++++++++ between ... and ... ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid BETWEEN 1 AND 2
"""
# -------------- Core
result_core_8 = conn.execute(
select(user_table).where(
user_table.c.uid.between(1, 2)
))
print(result_core_8.all()) # [(1, '张三', '12345'), (2, '李四', '12346')]
# -------------- ORM
result_orm_8 = session.execute(
select(User).where(
User.uid.between(1, 2)
))
print(result_orm_8.all()) # [(<User 张三>,), (<User 李四>,)]
# 9 ++++++++++++++++++ is null 或 is not null ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid IS NOT NULL
"""
from sqlalchemy import not_
# -------------- Core
result_core_9 = conn.execute(
select(user_table).where(
# user_table.c.uid.is_(None), # IS NULL
not_(user_table.c.uid.is_(None)), # IS NOT NULL
))
print(result_core_9.all()) # [(1, '张三', '12345'), (2, '李四', '12346')]
# -------------- ORM
result_orm_9 = session.execute(
select(User).where(
# User.uid.is_(None), # IS NULL
not_(User.uid.is_(None)), # IS NOT NULL
))
print(result_orm_9.all()) # [(<User 张三>,), (<User 李四>,)]
除此之外, 你还可以使用一些其他运算符, 详情见: Operator Reference
order_by
order_by用于排序
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
conn = engine.connect() # engine 在上面定义
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
# 1 ++++++++++++++++++ 根据某一列排序 (默认升序) ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user ORDER BY user.uid
"""
# -------------- Core
result_core_1 = conn.execute(
select(user_table).order_by(user_table.c.uid))
print(result_core_1.all()) # [(1, '张三', '12345'), (2, '李四', '12346')]
# -------------- ORM
result_orm_1 = session.execute(
select(User).order_by(User.uid))
print(result_orm_1.all()) # [(<User 张三>,), (<User 李四>,)]
# 2 ++++++++++++++++++ 手动指定升序/降序 ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user ORDER BY user.uid ASC/DESC
"""
# -------------- Core
result_core_2 = conn.execute(
# select(user_table).order_by(user_table.c.uid.asc()) # 升序
select(user_table).order_by(user_table.c.uid.desc()) # 降序
)
print(result_core_2.all())
# [(1, '张三', '12345'), (2, '李四', '12346')] / [(2, '李四', '12346'), (1, '张三', '12345')]
# -------------- ORM
result_orm_2 = session.execute(
# select(User).order_by(User.uid.asc()) # 升序
select(User).order_by(User.uid.desc()) # 降序
)
print(result_orm_2.all())
# [(<User 张三>,), (<User 李四>,)] / [(<User 李四>,), (<User 张三>,)]
# 3 ++++++++++++++++++ 根据别名排序 ++++++++++++++++++
"""
对应SQL
SELECT user.username AS name
FROM user ORDER BY name DESC
"""
from sqlalchemy import desc, asc
# -------------- Core
result_core_3 = conn.execute(
select((user_table.c.username).label("name")).order_by(desc("name"))
)
print(result_core_3.all()) # [('张三',), ('李四',)]
# -------------- ORM
result_orm_3 = session.execute(
select((User.username).label("name")).order_by(desc("name"))
)
print(result_orm_3.all()) # [('张三',), ('李四',)]
group_by和having
group_by
用于分组, having
类似于where
, 但可以对已分组数据使用聚合函数
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select, func
conn = engine.connect() # engine 在上面定义
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
# 1 ++++++++++++++++++ group_by一般使用 ++++++++++++++++++
"""
对应SQL
SELECT count(article.author_id) AS count
FROM article GROUP BY article.author_id
"""
# func 内有很多内置函数
# -------------- Core
result_core_1 = conn.execute(
select(func.count(article_table.c.author_id).label("count")).group_by(article_table.c.author_id)
)
print(result_core_1.all()) # [(2,), (1,)]
# -------------- ORM
result_orm_1 = session.execute(
select(func.count(Article.author_id).label("count")).group_by(Article.author_id)
)
print(result_orm_1.all()) # [(2,), (1,)]
# 2 ++++++++++++++++++ group by + having ++++++++++++++++++
"""
对应SQL
SELECT count(article.author_id) AS count
FROM article GROUP BY article.author_id
HAVING count(article.author_id) > 1
"""
# func 内有很多内置函数
# -------------- Core
result_core_2 = conn.execute(
select(func.count(article_table.c.author_id).label("count")).group_by(
article_table.c.author_id).having(func.count(article_table.c.author_id) > 1)
)
print(result_core_2.all()) # [(2,)]
# -------------- ORM
result_orm_2 = session.execute(
select(func.count(Article.author_id).label("count")).group_by(
Article.author_id).having(func.count(Article.author_id) > 1)
)
print(result_orm_2.all()) # [(2,)]
limit和offset
limit
: 表示取几条数据, offset
: 表示要跳过多少条数据
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
conn = engine.connect() # engine 在上面定义
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
# 1 ++++++++++++++++++ 仅使用LIMIT ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
LIMIT 1
"""
# -------------- Core
result_core_1 = conn.execute(select(user_table).limit(1))
print(result_core_1.all()) # [(1, '张三', '12345')]
# -------------- ORM
result_orm_1 = session.execute(select(User).limit(1))
print(result_orm_1.all()) # [(<User 张三>,)]
# 2 ++++++++++++++++++ 使用LIMIT和OFFSET ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
LIMIT 1, 1
"""
# -------------- Core
result_core_2 = conn.execute(select(user_table).limit(1).offset(1))
print(result_core_2.all()) # [(2, '李四', '12346')]
# -------------- ORM
result_orm_2 = session.execute(select(User).limit(1).offset(1))
print(result_orm_2.all()) # [(<User 李四>,)]
去重
你可以在查询的时候使用SQL进行去重, 亦可以在取到数据后进行去重
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
conn = engine.connect() # engine 在上面定义
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
# 1 ++++++++++++++++++ 在SQL中去重 ++++++++++++++++++
"""
对应SQL
SELECT DISTINCT article.author_id
FROM article
"""
# -------------- Core
result_core_1 = conn.execute(
select(article_table.c.author_id).distinct()
)
print(result_core_1.all()) # [(1,), (2,)]
# -------------- ORM
result_orm_1 = session.execute(
select(Article.author_id).distinct())
print(result_orm_1.all()) # [(1,), (2,)]
# 2 ++++++++++++++++++ 在结果中去重 ++++++++++++++++++
"""
对应SQL
SELECT article.author_id
FROM article
"""
# -------------- Core
result_core_2 = conn.execute(select(article_table.c.author_id))
print(result_core_2.unique().all()) # [(1,), (2,)]
# -------------- ORM
result_orm_2 = session.execute(select(Article.author_id))
print(result_orm_2.unique().all()) # [(1,), (2,)]
连接查询
有内连接查询
外连接查询
完全连接查询
三种连接查询方式, 在SQLAlchemy中分别对应着join(...)
join(..., isouter=True)
join(..., full=True)
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
conn = engine.connect() # engine 在上面定义
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
# 1 ++++++++++++++++++ 内连接查询 (自动推断join的表) 方式一 ++++++++++++++++++
"""
对应SQL
SELECT user.username, article.title
FROM user INNER JOIN article ON user.uid = article.author_id
"""
# -------------- Core
result_core_1 = conn.execute(
select(user_table.c.username, article_table.c.title).join_from(user_table, article_table))
print(result_core_1.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
# -------------- ORM
result_orm_1 = session.execute(
select(User.username, Article.title).join_from(User, Article))
print(result_orm_1.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
# 2 ++++++++++++++++++ 内连接查询 (自动推断join的表) 方式二 ++++++++++++++++++
"""
对应SQL
SELECT user.username, article.title
FROM user INNER JOIN article ON user.uid = article.author_id
"""
# -------------- Core
result_core_2 = conn.execute(
select(user_table.c.username, article_table.c.title).join(article_table))
print(result_core_2.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
# -------------- ORM
result_orm_2 = session.execute(select(User.username, Article.title).join(Article))
print(result_orm_2.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
# 3 ++++++++++++++++++ 内连接查询 (手动指定join的表) ++++++++++++++++++
"""
对应SQL
SELECT user.username, article.title
FROM user INNER JOIN article ON user.uid = article.author_id
"""
# -------------- Core
result_core_3 = conn.execute(
select(user_table.c.username, article_table.c.title).select_from(user_table).join(article_table))
print(result_core_3.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
# -------------- ORM
result_orm_3 = session.execute(
select(User.username, Article.title).select_from(User).join(Article))
print(result_orm_3.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
# 4 ++++++++++++++++++ 内连接查询 (手动指定on的条件) ++++++++++++++++++
"""
对应SQL
SELECT user.username, article.title
FROM user INNER JOIN article ON user.uid = article.author_id
"""
# -------------- Core
result_core_4 = conn.execute(
select(user_table.c.username, article_table.c.title).select_from(
user_table).join(article_table, user_table.c.uid == article_table.c.author_id))
print(result_core_4.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
# -------------- ORM
result_orm_4 = session.execute(
select(User.username, Article.title).select_from(
User).join(Article, User.uid == Article.author_id))
print(result_orm_4.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
# 5 ++++++++++++++++++ 外连接查询 ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password, article.title
FROM user LEFT OUTER JOIN article ON user.uid = article.author_id
"""
# -------------- Core
result_core_5 = conn.execute(
select(user_table, article_table.c.title).join(article_table, isouter=True))
print(result_core_5.all())
# [(1, '张三', '12345', 'C入门'), (1, '张三', '12345', 'C++入门'), (2, '李四', '12346', 'python入门')]
# -------------- ORM
result_orm_5 = session.execute(
select(User, Article.title).join(Article, isouter=True))
print(result_orm_5.all()) # [(<User 张三>, 'C入门'), (<User 张三>, 'C++入门'), (<User 李四>, 'python入门')]
# 6 ++++++++++++++++++ 完全连接查询 ++++++++++++++++++
# # !!! 注意: MYSQL 中没有 FULL OUTER JOIN, 执行时会报错
"""
对应SQL
SELECT user.uid, user.username, user.password, article.title
FROM user LEFT OUTER JOIN article ON user.uid = article.author_id
"""
# -------------- Core
result_core_6 = conn.execute(
select(user_table, article_table.c.title).join(article_table, full=True))
print(result_core_6.all())
# -------------- ORM
result_orm_6 = session.execute(
select(User, Article.title).join(Article, full=True))
print(result_orm_6.all())
比如:
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select
conn = engine.connect() # engine 在上面定义
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
"""
对应SQL
SELECT user.username, article.title
FROM user INNER JOIN article ON user.uid = article.author_id
"""
result_orm = session.execute(
select(User.username, Article.title).select_from(User).join(User.article))
print(result_orm.all()) # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
UNION和UNION ALL
UNION ALL
: 合并两个或多个SELECT
语句的结果, 结果不会去重UNION
: 合并两个或多个SELECT
语句的结果, 结果会去重
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select, union_all, union
conn = engine.connect() # engine 在上面定义
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
# 1 ++++++++++++++++++ UNION ALL ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid = 1
UNION ALL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid = 2
"""
# -------------- Core
result_core_1_stmt1 = select(user_table).where(user_table.c.uid == 1)
result_core_1_stmt2 = select(user_table).where(user_table.c.uid == 2)
result_core_1_u = union_all(result_core_1_stmt1, result_core_1_stmt2)
result_core_1 = conn.execute(result_core_1_u)
print(result_core_1.all()) # [(1, '张三', '12345'), (2, '李四', '12346')]
# -------------- ORM
result_orm_1_stmt1 = select(user_table).where(user_table.c.uid == 1)
result_orm_1_stmt2 = select(user_table).where(user_table.c.uid == 2)
result_orm_1_u = union_all(result_orm_1_stmt1, result_orm_1_stmt2)
result_orm_1 = session.execute(result_orm_1_u)
print(result_orm_1.all()) # [(1, '张三', '12345'), (2, '李四', '12346')]
# 2 ++++++++++++++++++ UNION ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid = 1
UNION
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid = 1
"""
# -------------- Core
result_core_2_stmt1 = select(user_table).where(user_table.c.uid == 1)
result_core_2_stmt2 = select(user_table).where(user_table.c.uid == 1)
result_core_2_u = union(result_core_2_stmt1, result_core_2_stmt2)
result_core_2 = conn.execute(result_core_2_u)
print(result_core_2.all()) # [(1, '张三', '12345')]
# -------------- ORM
result_orm_2_stmt1 = select(user_table).where(user_table.c.uid == 1)
result_orm_2_stmt2 = select(user_table).where(user_table.c.uid == 1)
result_orm_2_u = union(result_orm_2_stmt1, result_orm_2_stmt2)
result_orm_2 = session.execute(result_orm_2_u)
print(result_orm_2.all()) # [(1, '张三', '12345')]
子查询
即形如: SELECT * FROM data WHERE name IN (SELECT name FROM user);
和SELECT * FROM data WHERE EXISTS (SELECT name FROM user);
的查询
或使用WITH temp AS (...)
作为临时表
注意: 在使用子查询后,SQL语句的查询性能变得非常糟糕, 至于如何取舍看个人权衡了
from sqlalchemy.orm import sessionmaker
from sqlalchemy import select, func
conn = engine.connect() # engine 在上面定义
SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
# 1 ++++++++++++++++++ EXISTS 子查询 ++++++++++++++++++
"""
对应SQL
SELECT user.username
FROM user
WHERE EXISTS (SELECT count(article.author_id) AS count_1
FROM article GROUP BY article.author_id
HAVING count(article.author_id) >= 2)
"""
# -------------- Core
subquery_core_1 = (select(func.count(article_table.c.author_id)).group_by(
article_table.c.author_id).having(
func.count(article_table.c.author_id) >= 2)).exists()
result_core_1 = conn.execute(select(user_table.c.username).where(subquery_core_1))
print(result_core_1.all()) # [('张三',), ('李四',)]
# -------------- ORM
subquery_orm_1 = (select(func.count(Article.author_id)).group_by(
Article.author_id).having(
func.count(Article.author_id) >= 2)).exists()
result_orm_1 = session.execute(select(User.username).where(subquery_orm_1))
print(result_orm_1.all()) # [('张三',), ('李四',)]
# 2 ++++++++++++++++++ 其它 子查询 ++++++++++++++++++
"""
对应SQL
SELECT article.title, anon_1.username
FROM article, (SELECT user.uid AS uid, user.username AS username
FROM user
WHERE user.uid = 1) AS anon_1
WHERE article.author_id = anon_1.uid
"""
# -------------- Core
subquery_core_2 = select(user_table.c.uid, user_table.c.username).where(
user_table.c.uid == 1).subquery()
result_core_2 = conn.execute(
select(article_table.c.title, subquery_core_2.c.username).where(
article_table.c.author_id == subquery_core_2.c.uid))
print(result_core_2.all()) # [('C++入门', '张三'), ('C入门', '张三')]
# # -------------- ORM
subquery_orm_2 = select(User.uid, User.username).where(User.uid == 1).subquery()
result_orm_2 = session.execute(
select(Article.title, subquery_orm_2.c.username).where(
Article.author_id == subquery_orm_2.c.uid))
print(result_orm_2.all()) # [('C++入门', '张三'), ('C入门', '张三')]
# 3 ++++++++++++++++++ with 添加临时表 ++++++++++++++++++
"""
对应SQL
WITH anon_1 AS
(SELECT user.uid AS uid, user.username AS username
FROM user
WHERE user.uid = 1)
SELECT article.title, anon_1.username
FROM article, anon_1
WHERE article.author_id = anon_1.uid
"""
# -------------- Core
subquery_core_3 = select(user_table.c.uid, user_table.c.username).where(
user_table.c.uid == 1).cte()
result_core_3 = conn.execute(
select(article_table.c.title, subquery_core_3.c.username).where(
article_table.c.author_id == subquery_core_3.c.uid))
print(result_core_3.all()) # [('C++入门', '张三'), ('C入门', '张三')]
# -------------- ORM
subquery_orm_3 = select(User.uid, User.username).where(User.uid == 1).cte()
result_orm_3 = session.execute(
select(Article.title, subquery_orm_3.c.username).where(
Article.author_id == subquery_orm_3.c.uid))
print(result_orm_3.all()) # [('C++入门', '张三'), ('C入门', '张三')]
从1.x迁移到2.0的接口
一些Query
的接口用起来还是特别好用的, 因此在2.0风格的接口中, 一些接口仍然可以使用
get根据主键查询
# ---------------- 1.x
session.query(User).get(42)
# ---------------- 2.0
session.get(User, 42)
filter_by简单查询
result = session.execute(
select(User).filter_by(username="张三")
)
print(result.all()) # [(<User 张三>,)]
filter复杂查询
result = session.execute(
select(User).filter(User.username == "张三")
)
print(result.all()) # [(<User 张三>,)]
1.x与2.0的ORM接口对比
以下表格来自于: 2.0 Migration - ORM Usage
一些类的介绍
Result
表示从数据库中返回的结果, 一行数据使用Row
对象表示, 关于Row, 见: Row
from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker
# 导入公共基类
Base = declarative_base()
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
with Session() as session:
session_res = session.execute(
text("select tid, name from teacher;")
)
# <class 'sqlalchemy.engine.cursor.CursorResult'>
print(type(session_res))
with engine.connect() as conn:
conn_res = conn.execute(
text("select tid, name from teacher;")
)
# <class 'sqlalchemy.engine.cursor.CursorResult'>
print(type(conn_res))
注意: 例子中的teacher
表的数据为:
+-----+----------+
| tid | name |
+-----+----------+
| 1 | 语文老师 |
| 2 | 英语老师 |
| 3 | 数学老师 |
+-----+----------+
Result的全部方法
unique(strategy=None)
去重, 但需要注意何时调用, 应该在调用如.all()
这种生成Row
的方法之前调用, 否则返回的对象都没有unique
这个方法from sqlalchemy import create_engine, text from sqlalchemy.orm import declarative_base, sessionmaker # 导入公共基类 Base = declarative_base() # 数据库配置 DATABASE_CONFIG = { "username": "root", "password": "123456", "host": "localhost", "database": "test" } # 连接mysql engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG), echo=True, future=True) Session = sessionmaker(bind=engine) with Session() as session: session_res1 = session.execute( text("select tid, name from teacher;") ) session_res2 = session.execute( text("select tid, name from teacher;") ) session_res = session_res1.merge(session_res2) # [1, 2, 3, 1, 2, 3] ==> [1, 2, 3] print(session_res.scalars().unique().all())
all
返回所有Row数据的列表, 之后的调用将返回一个空列表from sqlalchemy import create_engine, text from sqlalchemy.orm import declarative_base, sessionmaker # 导入公共基类 Base = declarative_base() # 数据库配置 DATABASE_CONFIG = { "username": "root", "password": "123456", "host": "localhost", "database": "test" } # 连接mysql engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG), echo=True, future=True) Session = sessionmaker(bind=engine) with Session() as session: session_res = session.execute( text("select tid, name from teacher;") ) # ---------------------------------- 第一次调用all first_all = session_res.all() print(type(session_res.all())) # <class 'list'> for row in first_all: print(type(row)) # <class 'sqlalchemy.engine.row.Row'> print(f"{row.tid}-{row.name}") # ---------------------------------- 第二次调用all print(session_res.all()) # []
fetchall()
与all
方法一样from sqlalchemy import create_engine, text from sqlalchemy.orm import declarative_base, sessionmaker # 导入公共基类 Base = declarative_base() # 数据库配置 DATABASE_CONFIG = { "username": "root", "password": "123456", "host": "localhost", "database": "test" } # 连接mysql engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG), echo=True, future=True) Session = sessionmaker(bind=engine) with Session() as session: session_res = session.execute( text("select tid, name from teacher;") ) first_fetchall = session_res.fetchall() second_fetchall = session_res.fetchall() # [(1, '语文老师'), (2, '英语老师'), (3, '数学老师')] print(first_fetchall) # [] print(second_fetchall)
fetchmany(size=None)
取多行数据,size
表示取多少行数据 , 当所有行都用完时, 返回一个空列表from sqlalchemy import create_engine, text from sqlalchemy.orm import declarative_base, sessionmaker # 导入公共基类 Base = declarative_base() # 数据库配置 DATABASE_CONFIG = { "username": "root", "password": "123456", "host": "localhost", "database": "test" } # 连接mysql engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG), echo=True, future=True) Session = sessionmaker(bind=engine) with Session() as session: session_res = session.execute( text("select tid, name from teacher;") ) # 一共3行数据 # 取两行数据: [(1, '语文老师'), (2, '英语老师')] print(session_res.fetchmany(2)) # 取一行数据: [(3, '数学老师')] print(session_res.fetchmany(1)) # 没有数据了, 返回空列表: [] print(session_res.fetchmany(1))
fetchone()
取一行数据, 当所有行都用完时,返回None
from sqlalchemy import create_engine, text from sqlalchemy.orm import declarative_base, sessionmaker # 导入公共基类 Base = declarative_base() # 数据库配置 DATABASE_CONFIG = { "username": "root", "password": "123456", "host": "localhost", "database": "test" } # 连接mysql engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG), echo=True, future=True) Session = sessionmaker(bind=engine) with Session() as session: session_res = session.execute( text("select tid, name from teacher;") ) while True: row = session_res.fetchone() if not row: break print(row) """ (1, '语文老师') (2, '英语老师') (3, '数学老师') """
first()
获取第一行数据,关闭Result并丢弃其余行, 如果没有行,则不获取 (即返回值为None
)from sqlalchemy import create_engine, text from sqlalchemy.orm import declarative_base, sessionmaker # 导入公共基类 Base = declarative_base() # 数据库配置 DATABASE_CONFIG = { "username": "root", "password": "123456", "host": "localhost", "database": "test" } # 连接mysql engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG), echo=True, future=True) Session = sessionmaker(bind=engine) with Session() as session: session_res = session.execute( text("select tid, name from teacher;") ) # (1, '语文老师') print(session_res.first()) # !!! 由于Result已经关闭, 继续操作会报错: # sqlalchemy.exc.ResourceClosedError: This result object is closed. print(session_res.first())
one()
只返回一行数据或引发异常, 并关闭Result (无数据时抛出:sqlalchemy.exc.NoResultFound
, 多行数据时抛出:sqlalchemy.exc.MultipleResultsFound
)from sqlalchemy import create_engine, text from sqlalchemy.orm import declarative_base, sessionmaker # 导入公共基类 Base = declarative_base() # 数据库配置 DATABASE_CONFIG = { "username": "root", "password": "123456", "host": "localhost", "database": "test" } # 连接mysql engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG), echo=True, future=True) Session = sessionmaker(bind=engine) with Session() as session: session_res = session.execute( text("select tid, name from teacher where tid=1;") ) # (1, '语文老师') print(session_res.one())
one_or_none()
最多返回一行数据或引发异常, 并关闭Result (无数据时返回None
, 多行数据时抛出:sqlalchemy.exc.MultipleResultsFound
)from sqlalchemy import create_engine, text from sqlalchemy.orm import declarative_base, sessionmaker # 导入公共基类 Base = declarative_base() # 数据库配置 DATABASE_CONFIG = { "username": "root", "password": "123456", "host": "localhost", "database": "test" } # 连接mysql engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG), echo=True, future=True) Session = sessionmaker(bind=engine) with Session() as session: session_res1 = session.execute( text("select tid, name from teacher where tid < 1;") ) # None print(session_res1.one_or_none()) session_res2 = session.execute( text("select tid, name from teacher where tid = 1;") ) # (1, '语文老师') print(session_res2.one_or_none())
columns(*col_expressions)
限制返回列, 也可以对列进行重新排序
即假如结果的列为(a, b, c, d), 但我只需要a和b, 那么只需要result.columns("a", "b")
, 你还可以调整它们的顺序, 以方便解包
注意: 这会修改Result的列, 且该方法的返回值就是修改后的Result对象from sqlalchemy import create_engine, text from sqlalchemy.orm import declarative_base, sessionmaker # 导入公共基类 Base = declarative_base() # 数据库配置 DATABASE_CONFIG = { "username": "root", "password": "123456", "host": "localhost", "database": "test" } # 连接mysql engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG), echo=True, future=True) Session = sessionmaker(bind=engine) with Session() as session: session_res1 = session.execute( text("select tid, name from teacher;") ) session_res2 = session.execute( text("select tid, name from teacher;") ) for row in session_res1.columns("tid"): # 返回值时修改后的Result # 获取字段名元组 print(row._fields) # ('tid',) # 会修改原Result session_res2.columns("name") for row in session_res2: # 获取字段名元组 print(row._fields) # ('name',)
scalar()
获取第一行的第一列数据, 并关闭Result. 如果没有要获取的行, 则返回None
如:[(1, "lczmx"), (2, "jack")]
, 返回1
from sqlalchemy import create_engine, text from sqlalchemy.orm import declarative_base, sessionmaker # 导入公共基类 Base = declarative_base() # 数据库配置 DATABASE_CONFIG = { "username": "root", "password": "123456", "host": "localhost", "database": "test" } # 连接mysql engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG), echo=True, future=True) Session = sessionmaker(bind=engine) with Session() as session: session_res = session.execute( text("select tid, name from teacher;") ) print(session_res.scalar()) # 1
scalar_one()
只返回一行数据的第一列或引发异常, 并关闭Result (无数据时抛出:sqlalchemy.exc.NoResultFound
, 多行数据时抛出:sqlalchemy.exc.MultipleResultsFound
)
即Result.one()
+Result.scalar()
from sqlalchemy import create_engine, text from sqlalchemy.orm import declarative_base, sessionmaker # 导入公共基类 Base = declarative_base() # 数据库配置 DATABASE_CONFIG = { "username": "root", "password": "123456", "host": "localhost", "database": "test" } # 连接mysql engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG), echo=True, future=True) Session = sessionmaker(bind=engine) with Session() as session: session_res = session.execute( text("select tid, name from teacher where tid=1;") ) print(session_res.scalar_one()) # 1
scalar_one_or_none()
最多返回一行数据的第一列或引发异常, 并关闭Result (无数据时返回None
, 多行数据时抛出:sqlalchemy.exc.MultipleResultsFound
)
即Result.one_or_none()
+Result.scalar()
from sqlalchemy import create_engine, text from sqlalchemy.orm import declarative_base, sessionmaker # 导入公共基类 Base = declarative_base() # 数据库配置 DATABASE_CONFIG = { "username": "root", "password": "123456", "host": "localhost", "database": "test" } # 连接mysql engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG), echo=True, future=True) Session = sessionmaker(bind=engine) with Session() as session: session_res1 = session.execute( text("select tid, name from teacher where tid<1;") ) print(session_res1.scalar_one_or_none()) # None session_res2 = session.execute( text("select tid, name from teacher where tid=1;") ) print(session_res2.scalar_one_or_none()) # 1
scalars(index=0)
返回一个ScalarResult
对象, 该对象以每行数据的 第index
列 元素作为数据 (而不是Result
的Row
)
该对象的方法有:unique
partitions
fetchall
fetchmany
all
first
one_or_none
one
具体使用与Result
类似from sqlalchemy import create_engine, text from sqlalchemy.orm import declarative_base, sessionmaker # 导入公共基类 Base = declarative_base() # 数据库配置 DATABASE_CONFIG = { "username": "root", "password": "123456", "host": "localhost", "database": "test" } # 连接mysql engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG), echo=True, future=True) Session = sessionmaker(bind=engine) with Session() as session: session_res = session.execute( text("select tid, name from teacher;") ) # [1, 2, 3] print(session_res.scalars().all())
mappings()
返回一个MappingResult
对象,MappingResult
对象与Result
对象类似, 但是一行数据使用RowMapping
对象表示,RowMapping
对象类似于字典对象, 简而言之: 调用该方法, 你可以将一行数据由类元组变为类字典from sqlalchemy import create_engine, text from sqlalchemy.orm import declarative_base, sessionmaker # 导入公共基类 Base = declarative_base() # 数据库配置 DATABASE_CONFIG = { "username": "root", "password": "123456", "host": "localhost", "database": "test" } # 连接mysql engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG), echo=True, future=True) Session = sessionmaker(bind=engine) with Session() as session: session_res = session.execute( text("select tid, name from teacher;") ) for d in session_res.mappings(): # 像操作字典一样操作 d 即可 print(d.get("tid"), d.get("name"))
keys()
从SQLAlchemy1.4起 (之前的版本返回一个列表), 该方法将返回一个RMKeyView
对象, 该对象可迭代, 其_keys
属性存放列的名称, 由于实现的__contains__
方法, 因此也可以使用in
运算符作判断.from sqlalchemy import create_engine, text from sqlalchemy.orm import declarative_base, sessionmaker from collections import Iterable # 导入公共基类 Base = declarative_base() # 数据库配置 DATABASE_CONFIG = { "username": "root", "password": "123456", "host": "localhost", "database": "test" } # 连接mysql engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG), echo=True, future=True) Session = sessionmaker(bind=engine) with Session() as session: session_res = session.execute( text("select tid, name from teacher;") ) keys = session_res.keys() print(keys) # RMKeyView(['tid', 'name']) # <class 'sqlalchemy.engine.result.RMKeyView'> print(type(keys)) # 可迭代的 print(isinstance(keys, Iterable)) # True if "name" in keys: print("name in keys")
freeze()
可以对Result
进行缓存, 见官方文档: Re-Executing Statementsmerge(*others)
该方法合并其他Result
, 返回一个MergedResult
对象, 你可以像一般的Result
一样操作它, 但是取值的时候注意游标的位置(MergedResult
关闭,Result
也关闭;MergedResult
取完了值,Result
的值也被取完)from sqlalchemy import create_engine, text from sqlalchemy.orm import declarative_base, sessionmaker # 导入公共基类 Base = declarative_base() # 数据库配置 DATABASE_CONFIG = { "username": "root", "password": "123456", "host": "localhost", "database": "test" } # 连接mysql engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG), echo=True, future=True) Session = sessionmaker(bind=engine) with Session() as session: session_res1 = session.execute( text("select tid, name from teacher where tid=1;") ) session_res2 = session.execute( text("select tid, name from teacher where tid=2;") ) session_res = session_res2.merge(session_res1) # 注意 先session_res2再session_res1 # (2, '英语老师') print(session_res.fetchone()) # [(1, '语文老师')] print(session_res1.all()) # session_res已经取过一次 # 所以返回: [] print(session_res2.all())
partitions(size=None)
迭代生成size
大小的行的子列表,size
为None
时调用Result.yield_per()
, 否则调用Result.fetchmany(size)
from sqlalchemy import create_engine, text from sqlalchemy.orm import declarative_base, sessionmaker # 导入公共基类 Base = declarative_base() # 数据库配置 DATABASE_CONFIG = { "username": "root", "password": "123456", "host": "localhost", "database": "test" } # 连接mysql engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG), echo=True, future=True) Session = sessionmaker(bind=engine) with Session() as session: session_res1 = session.execute( text("select tid, name from teacher;") ) # 每次迭代 取 1行 数据 for i in session_res1.partitions(): print(i) """ [(1, '语文老师')] [(2, '英语老师')] [(3, '数学老师')] """ session_res2 = session.execute( text("select tid, name from teacher;") ) # 每次迭代 取 2行 数据 for i in session_res2.partitions(2): print(i) """ [(1, '语文老师'), (2, '英语老师')] [(3, '数学老师')] """ # 已经迭代完了, 就没有值了 print(list(session_res2.partitions())) # []
yield_per(num)
迭代num
行数据, 返回的是Result
对象from sqlalchemy import create_engine, text from sqlalchemy.orm import declarative_base, sessionmaker # 导入公共基类 Base = declarative_base() # 数据库配置 DATABASE_CONFIG = { "username": "root", "password": "123456", "host": "localhost", "database": "test" } # 连接mysql engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG), echo=True, future=True) Session = sessionmaker(bind=engine) with Session() as session: session_res = session.execute( text("select tid, name from teacher;") ) # [(1, '语文老师'), (2, '英语老师'), (3, '数学老师')] print(session_res.yield_per(1).all())
close
关闭此Result
, 再操作的话会抛出异常:sqlalchemy.exc.ResourceClosedError: This result object is closed.
Row
一般来说, 一行数据是一个Row
对象
常用的属性或方法:
一般使用:
注意: 我们每一次迭代Result对象, 得到的是Row对象
通过属性取值
result = conn.execute(text("select x, y from some_table"))
for row in result:
y = row.y
# illustrate use with Python f-strings
print(f"Row: {row.x} {row.y}")
通过元组解包
result = conn.execute(text("select x, y from some_table"))
for x, y in result:
# ...
通过索引取值
result = conn.execute(text("select x, y from some_table"))
for row in result:
x = row[0]
MetaData
MetaData
是包含Table
和Engine
的对象, 也就是说它主要是用来管理Table
(表)的
下面列出MetaData
对象的一些方法
内置函数
常用的SQL函数:
使用例子:
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import Column, Integer, String
from sqlalchemy import select, func
# 导入公共基类
Base = declarative_base()
# 数据库配置
DATABASE_CONFIG = {
"username": "root",
"password": "123456",
"host": "localhost",
"database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
echo=True, future=True)
Session = sessionmaker(bind=engine)
class Teacher(Base):
__tablename__ = "teacher"
tid = Column("tid", Integer, primary_key=True, autoincrement=True)
name = Column("name", String(10), nullable=False, comment="教师名")
with Session() as session:
session_res = session.execute(
select(func.count()).select_from(Teacher)
)
# (3,)
print(session_res.one())
Column定义
一个Column
即表的一列数据, 和我们用SQL语句定义一列数据一样, 参数主要包括: 字段名、字段类型、约束条件, 比如:
from sqlalchemy import Column, String
Column("name", String(30), unique=True, nullable=False, comment='姓名')
字段类型, 一般你可以用直接指定数据库的字段类型, 也可以让SQLAlchemy的DDL自动选择字段类型
直接使用数据库的字段类型
自动转化的字段类型
约束条件
Column
的例子:
from sqlalchemy import DateTime, func
class Data(Base):
__tablename__ = 'data'
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
created_at = Column(DateTime, server_default=func.now(), comment='创建时间')
updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='更新时间')