其实在2.0风格中, 主要受到影响的是ORM的查询方式, 详情见文档: 2.0 Migration - ORM Usage

安装

pip install sqlalchemy

检测sqlalchemy版本:

>>>import sqlalchemy
>>>sqlalchemy.__version__
'1.4.27'

使用步骤

一般来说SQLAlchemy的使用方式有两种: CoreORM
两种有什么不同呢?

  1. ORM是构建在Core之上的
  2. Core更加底层, 可以执行直接执行SQL语句
  3. ORM类似于Django的ORM, 由于sqlalchemy提供了一套接口, 所以不需要我们直接写SQL语句 (1.x版本)
  4. 至于要用哪个, 等到你用到时, 你会知道的

组件依赖关系图:
SQLAlchemy完全入门-LMLPHP

Core

一般来说, 使用步骤如下:

  1. 配置数据库连接
  2. 建立连接
  3. 创建表
  4. 执行SQL语句, 按需开启事件是否自动提交
  5. 拿到返回数据, 执行其他代码

数据库的连接的格式

我们在创建引擎(连接)时, 需要指定数据库的URL, URL格式, 见: Engine Configuration
, 总的来说, 格式就是: dialect[+driver]://user:password@host/dbname[?key=value..]

  • dialect 数据库名称(方言): 如mysql
  • driver 连接数据库的库: 如: pymysql
  • user 用户名
  • password 密码
  • host 地址
  • dbname 数据库名称
  • key=value 指的是给数据库的参数

如下面的URL:

mysql+pymysql://root:[email protected]:3306/test_db?charset=utf8

建立连接

调用sqlalchemy.create_engine方法, 为了兼容2.0风格的接口, 可以加上future参数. 至于什么是2.0风格的接口, 可以看看官方文档: 2.0 style
create_engine有几个参数需要我们注意:

  • url 即数据库url, 其格式见上文: 数据库的连接的格式
  • echo参数为True时, 将会将engine的SQL记录到日志中 ( 默认输出到标准输出)
  • echo_poolTrue时,会将连接池的记录信息输出
  • future 使用2.0样式EngineConnection API

更多参数见官方文档: sqlalchemy.create_engine

例子

from sqlalchemy import create_engine

# 兼容2.0的写法
# 返回对象不一样
engine1 = create_engine("sqlite+pysqlite:///:memory:", echo=True, future=True)
print(type(engine1))
# <class 'sqlalchemy.future.engine.Engine'>

engine2 = create_engine("sqlite+pysqlite:///:memory:", echo=True)
print(type(engine2))
# <class 'sqlalchemy.engine.base.Engine'>

创建表

我们想要让数据库创建一个表, 需要利用MetaData对象, 关于一些常用的MetaData方法, 见: MetaData
除了要MetaData对象外, 我们还需要Table对象, 用于定义一个表的结构
Table的一般使用

mytable = Table("mytable", metadata,
    Column('mytable_id', Integer, primary_key=True),
    Column('value', String(50))
    )

Table的参数:

  • name 表名称
  • metadata 该表所属的MetaData对象
  • 其他参数: 通过Column指定一列数据, 格式见: Column定义

例子:

from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, text

#  数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
metadata_obj = MetaData()

user_table = Table(
    "user_account",
    metadata_obj,
    Column('id', Integer, primary_key=True),
    Column("username", String(30)))  # String也可以不实例化

# 第二个表
address_table = Table(
    "address",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    # 定义外键
    Column("uid", ForeignKey("user_account.id"), nullable=False),
    Column('email_address', String(32), nullable=False)
)
# 相当于执行 CREATE TABLE 语句
metadata_obj.create_all(engine)

"""
-- 相当于:
CREATE TABLE user_account (
    id INTEGER NOT NULL AUTO_INCREMENT,
    username VARCHAR(30),
    PRIMARY KEY (id)
);
CREATE TABLE address (
    id INTEGER NOT NULL AUTO_INCREMENT,
    uid INTEGER NOT NULL,
    email_address VARCHAR(32) NOT NULL,
    PRIMARY KEY (id),
    FOREIGN KEY(uid) REFERENCES user_account (id)
)
"""

Table的一些属性

# ---------- 访问所有列
# .c  => Column
print(user_table.c.keys())
# ['id', 'username']

# ---------- 访问某一列
print(repr(user_table.c.username))
# Column('username', String(length=30), table=<user>)

# ---------- 返回主键
print(user_table.primary_key)
# 隐式生成
# PrimaryKeyConstraint(Column('id', Integer(), table=<user>, primary_key=True, nullable=False))

在事务中执行SQL

通常, 我们通过调用engine.connectengine.begin方法开始一个事件
sqlalchemy使用事务有两种风格commit as you goBegin once, 前者需要我们手动提交, 后者会自动提交

手动提交

engine.connect方法符合python的上下文管理协议, 会返回一个Connection对象, 该方法会在不手动提交的情况下回滚.举个例子:

from sqlalchemy import create_engine
from sqlalchemy import text

#  数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

with engine.connect() as conn:
    # 执行
    result = conn.execute(text("select 'hello world'")) # text 可以使用SQL语句
    print(result.all())
    # conn.commit()
    # [('hello world',)]

    # 最后会ROLLBACK

上面的代码中, 相当于开启了事务, 由于最后没有调用commit方法, 所以会回滚.

自动提交

engine.begin方法也符合python的上下文管理协议, 只要执行时不报错就会自动提交, 报错时会回滚.

from sqlalchemy import create_engine
from sqlalchemy import text

#  数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

with engine.begin() as conn:
    result = conn.execute(text("select 'hello world'"))
    print(result.all())
    # [('hello world',)]

    # COMMIT

绑定参数

上面在事务中执行SQL语句时, 我们用到了sqlalchemy.text, 可以直接定义文本SQL字符串
为了避免被SQL注入, 故在需要传入参数的场景中需要根据sqlalchemy的方式传入, 而不是直接拼接成字符串.
使用:y的格式定义参数, 且将值以字典的形式传给execute

from sqlalchemy import create_engine
from sqlalchemy import text

#  数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

with engine.begin() as conn:
    result = conn.execute(text("select name from userinfo where name like :y"), {"y": "lcz%"})
    print(result.all())
    # [('lczmx',)]

    # COMMIT

多个参数时, 可以这样

with engine.connect() as conn:
    conn.execute(
        text("INSERT INTO userinfo (id, name) VALUES (:x, :y)"),
        [{"x": 1, "y": "lcmx"}, {"x": 2, "y": "xxx"}])
    conn.commit()

这种方式也可以

stmt = text("SELECT x, y FROM some_table WHERE y > :y ORDER BY x, y").bindparams(y=6)

with engine.connect() as conn:
    conn.execute(stmt)
    conn.commit()

增删改查

处理使用text直接执行SQL外, 你还可以使用其他语法增删改查数据
假如表结构如下:

$show create table address;
+---------+-----------------------------------------+
| Table   | Create Table                            |
+---------+-----------------------------------------+
| address | CREATE TABLE `address` (
  `id` int NOT NULL AUTO_INCREMENT,
  `uid` int NOT NULL,
  `email_address` varchar(32) NOT NULL,
  PRIMARY KEY (`id`),
  KEY `uid` (`uid`),
  CONSTRAINT `address_ibfk_1` FOREIGN KEY (`uid`) REFERENCES `user_account` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=gbk |
+---------+------------------------------------------+


$show create table user_account;
+--------------+------------------------------------+
| Table        | Create Table                       |
+--------------+------------------------------------+
| user_account | CREATE TABLE `user_account` (
  `id` int NOT NULL AUTO_INCREMENT,
  `username` varchar(30) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=gbk |
+--------------+-------------------------------------+
1 row in set (0.00 sec)


插入数据

使用insert(...).values(...)形式为数据库插入数据

from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, insert

#  数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".format(**DATABASE_CONFIG),
                       echo=True, future=True)
metadata_obj = MetaData()

user_table = Table(
    "user_account",
    metadata_obj,
    Column('id', Integer, primary_key=True),
    Column("username", String(30)))  # String也可以不实例化

# 第二个表
address_table = Table(
    "address",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("uid", ForeignKey("user_account.id"), nullable=False),
    Column('email_address', String(32), nullable=False)
)

metadata_obj.create_all(bind=engine)

with engine.connect() as conn:
    # 插入一条普通数据
    conn.execute(insert(user_table).values(id=1, username="lczmx"))
    # 插入外键等数据
    conn.execute(insert(address_table).values(uid=1, email_address="[email protected]"))

    # 自动生成value, 不需要我们手动指定

    conn.execute(insert(user_table),
                 [{"username": "张三"},
                  {"username": "李四"},
                  {"username": "王五"},
                  {"username": "赵六"},
                  ])

    conn.commit()

SQLAlchemy还提供了更复杂的用法, 见: Inserting Rows with Core

删除数据

使用delete(...).where(...)的形式删除数据

目前的表数据:

select u.id as uid, u.username, a.id as aid, a.email_address as email_address
from user_account as u
left join  address as a on u.id=a.uid;
+-----+----------+------+-------------------+
| uid | username | aid  | email_address     |
+-----+----------+------+-------------------+
|   1 | lczmx    |    1 | [email protected] |
|   2 | 张三     | NULL | NULL              |
|   3 | 李四     | NULL | NULL              |
|   4 | 王五     | NULL | NULL              |
|   5 | 赵六     | NULL | NULL              |
+-----+----------+------+-------------------+

例子:

from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, delete

#  数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".format(**DATABASE_CONFIG),
                       echo=True, future=True)
metadata_obj = MetaData()

user_table = Table(
    "user_account",
    metadata_obj,
    Column('id', Integer, primary_key=True),
    Column("username", String(30)))  # String也可以不实例化

# 第二个表
address_table = Table(
    "address",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("uid", ForeignKey("user_account.id"), nullable=False),
    Column('email_address', String(32), nullable=False)
)

metadata_obj.create_all(bind=engine)

with engine.connect() as conn:
    # 一般删除
    # user_table.c 获取的是 列数据
    result1 = conn.execute(delete(user_table).where(user_table.c.id == 3))
    print(f"受影响行数: {result1.rowcount}")  # 受影响行数: 1

    # and 删除
    result2 = conn.execute(delete(user_table).where(user_table.c.username == "张三", user_table.c.id == 2))
    print(f"受影响行数: {result2.rowcount}")  # 受影响行数: 1

    conn.commit()

更多见: The delete() SQL Expression Construct

更新数据

使用update(...).where(...).values(...)的形式更新数据

select u.id as uid, u.username, a.id as aid, a.email_address as email_address
from user_account as u
left join  address as a on u.id=a.uid;

+-----+----------+------+-------------------+
| uid | username | aid  | email_address     |
+-----+----------+------+-------------------+
|   1 | lczmx    |    1 | [email protected] |
|   2 | 张三     | NULL | NULL              |
|   3 | 李四     | NULL | NULL              |
|   4 | 王五     | NULL | NULL              |
|   5 | 赵六     | NULL | NULL              |
+-----+----------+------+-------------------+

例子:

from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, update, bindparam, select

#  数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".
                       format(**DATABASE_CONFIG), echo=True, future=True)
metadata_obj = MetaData()

user_table = Table(
    "user_account",
    metadata_obj,
    Column('id', Integer, primary_key=True),
    Column("username", String(30)))  # String也可以不实例化

# 第二个表
address_table = Table(
    "address",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("uid", ForeignKey("user_account.id"), nullable=False),
    Column('email_address', String(32), nullable=False)
)

metadata_obj.create_all(bind=engine)

with engine.connect() as conn:
    # 一般更新
    result1 = conn.execute(update(user_table).where(
        user_table.c.username == "王五").values(username="王老五"))
    print(f"受影响行数: {result1.rowcount}")  # 受影响行数: 1

    # 更新数据 加上 原来的数据
    result2 = conn.execute(
        update(user_table).where(user_table.c.username == "赵六").values(
            username=user_table.c.username + "一号"))
    print(f"受影响行数: {result2.rowcount}")  # 受影响行数: 1

    # 以字典的形式, 替换更新多个值
    result3 = conn.execute(
        update(user_table).where(user_table.c.username == bindparam('old_name')).values(
            username=bindparam('new_name')),
        [
            {"old_name": "张三", "new_name": "新张三"},
            {"old_name": "李四", "new_name": "新李四"},
        ]
    )

    print(f"受影响行数: {result3.rowcount}")  # 受影响行数: 2

    # 以 子查询 的方式 更新数据
    scalar_subq = (
        select(address_table.c.email_address).
            where(address_table.c.uid == user_table.c.id).
            order_by(address_table.c.id).
            limit(1).
            scalar_subquery()
    )
    # 将email_address的值 赋给 username
    update(user_table).values(username=scalar_subq)

    """
    -- 以上查询, 相当于:
    UPDATE user_account SET username=(SELECT address.email_address
    FROM address
    WHERE address.uid = user_account.id ORDER BY address.id
    LIMIT :param_1)
    """
    conn.commit()

修改后的结果:

+-----+----------+------+-------------------+
| uid | username | aid  | email_address     |
+-----+----------+------+-------------------+
|   1 | lczmx    |    1 | [email protected] |
|   2 | 新张三   | NULL | NULL              |
|   3 | 新李四   | NULL | NULL              |
|   4 | 王老五   | NULL | NULL              |
|   5 | 赵六一号 | NULL | NULL              |
+-----+----------+------+-------------------+

更多见: Updating and Deleting Rows with Core

查询数据

由于2.0的查询方式, Core和ORM都可以使用, 所以放在一起, 见下文: 查询数据详解

处理查询返回的数据

我们执行conn.execute方法的结果为: CursorResult对象
其本质上是继承与Result对象, 其使用方式见: Result

例子:
假如查询的表:

mysql> select * from user_account;
+----+----------+
| id | username |
+----+----------+
|  9 | lczmx    |
| 10 | jack     |
| 11 | tom      |
| 12 | mike     |
+----+----------+
4 rows in set (0.00 sec)

mysql>

利用SQLAlchemy获取数据:

from sqlalchemy import MetaData, Table, Column, Integer, String, ForeignKey
from sqlalchemy import create_engine, text

#  数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

with engine.connect() as conn:
    # 执行
    result = conn.execute(text("select * from user_account;"))

    for row in result.all():
        # 使用f-strings 格式化字符串
        print(f"id: {row.id:3}, username: {row.username:20}")
    # 打印的结果:
    """
    id:   9, username: lczmx
    id:  10, username: jack
    id:  11, username: tom
    id:  12, username: mike
    """
    conn.commit()

ORM

和Core一样, ORM也有一定的使用步骤:

  1. 配置数据库连接, 见上文: 数据库的连接的格式
  2. 创建会话
  3. 创建表
  4. 使用接口, 增删改查数据
  5. 拿到返回数据, 执行其他代码

在学习SQLAlcehmy的ORM之前, 建议先了解一些概念, 以免后面会混淆

  1. 会话 Session
    会话是SQLAlchemy ORM与数据库的交互对象
    它可以管理建立连接engine, 并为通过会话加载或与会话关联的对象提供标识映射 (identity map)
    在使用时与Connection非常相似, 你可以对比着使用

  2. Base
    通过sqlalchemy.orm.declarative_base创建
    作为定义表的基类, 内部有包含MetaData对象
    可以类似于Django一样定义表

SQLAlchemy中, session是一个连接池, 的由其管理, 因此, 假如我们需要操作数据库的话, 需要在session中拿到Connection(连接)

创建会话

SQLAlchemy提供了两种创建会话的方法:

  1. sqlalchemy.orm.Session
    from sqlalchemy import create_engine
    from sqlalchemy.orm import Session
    
    # 创建引擎
    engine = create_engine('postgresql://scott:tiger@localhost/')
    
    # 创建会话
    # 以下with可以简写成 with Session(engine) as session, session.begin():
    with Session(engine) as session:
        # 开启自动提交
        with session.begin():
            # add方法 会将some_object 保存到数据库
            # session.add(some_object)
            # session.add(some_other_object)
            pass
    
    
  2. sqlalchemy.orm.sessionmaker
    from sqlalchemy import create_engine
    from sqlalchemy.orm import sessionmaker
    
    # 创建引擎
    engine = create_engine('postgresql://scott:tiger@localhost/')
    
    # 创建session
    Session = sessionmaker(engine)
    
    # 一般使用
    with Session() as session:
        # session.add(some_object)
        # session.add(some_other_object)
        # 提交
        session.commit()
    
    # 自动提交
    with Session.begin() as session:
        # session.add(some_object)
        # session.add(some_other_object)
        pass
    
    

虽然有两种方法创建会话, 但我们一般使用sessionmaker创建会话

另外补充一下session的其它使用方式:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

engine = create_engine('postgresql://scott:tiger@localhost/')

Session = sessionmaker(engine)

# 从连接指定到session
with engine.connect() as connection:
    with Session(bind=connection) as session:
        # 一些操作
        pass

下面列出session的一些常用方法, 增删改查数据时要用到

例子:

from sqlalchemy import create_engine, text
from sqlalchemy.orm import Session

#  数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)

stmt = text("SELECT id, name FROM userinfo WHERE id > :y").bindparams(y=1)
with Session(engine) as session:
    result = session.execute(stmt)
    print(result.all())
    # [(2, 'name2'), (3, 'name2')]

    # ROLLBACK

在ORM中创建表

使用ORM时, 我们也需要MetaData, 不同的是, 我们是通过sqlalchemy.orm.registry构造的. 而且, 我们不需要像Core那样直接声明Table, 而是继承某个公共基类 (Base), 添加属性即可. 有两种方式定义基类.
方式一:

from sqlalchemy.orm import registry
mapper_registry = registry()
print(mapper_registry.metadata)  # MetaData对象
# 公共基类
Base = mapper_registry.generate_base()

方法二:

from sqlalchemy.orm import declarative_base

# 内部 return registry(...).generate_base(...)
Base = declarative_base()

现在你可以像在Django ORM中一样, 定义表并在数据库中创建表, 每一个Column表示一列数据, 关于Column的写法, 见: Column定义

from sqlalchemy import Column, String, Integer, create_engine, SMALLINT, Boolean, ForeignKey
from sqlalchemy.orm import relationship, declarative_base, sessionmaker

# 导入公共基类
Base = declarative_base()
#  数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)


class Student(Base):
    __tablename__ = "student"
    sid = Column("sid", Integer, primary_key=True)
    name = Column("name", String(32), nullable=False, index=True, comment="姓名")
    age = Column("age", SMALLINT, nullable=False, comment="年龄")
    gender = Column("gender", Boolean, nullable=False, comment="性别, True: 男, False: 女")


class Course(Base):
    __tablename__ = "course"
    cid = Column("cid", Integer, primary_key=True)
    name = Column("name", String(10), nullable=False, comment="科目名")
    tid = Column("tid", ForeignKey("teacher.tid"), comment="课程教师")


class Teacher(Base):
    __tablename__ = "teacher"
    tid = Column("tid", Integer, primary_key=True)
    name = Column("name", String(10), nullable=False, comment="教师名")


class Score(Base):
    __tablename__ = "score"
    sid = Column("sid", Integer, primary_key=True)
    score = Column("score", SMALLINT, nullable=False, comment="成绩")
    student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
    course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")


Base.metadata.create_all(bind=engine)

"""
-- 对于sql
CREATE TABLE student (
    sid INTEGER NOT NULL AUTO_INCREMENT,
    name VARCHAR(32) NOT NULL COMMENT '姓名',
    age SMALLINT NOT NULL COMMENT '年龄',
    gender BOOL NOT NULL COMMENT '性别, True: 男, False: 女',
    PRIMARY KEY (sid)
)
CREATE TABLE teacher (
    tid INTEGER NOT NULL AUTO_INCREMENT,
    name VARCHAR(10) NOT NULL COMMENT '教师名',
    PRIMARY KEY (tid)
)
CREATE TABLE course (
    cid INTEGER NOT NULL AUTO_INCREMENT,
    name VARCHAR(10) NOT NULL COMMENT '科目名',
    tid INTEGER COMMENT '课程教师',
    PRIMARY KEY (cid),
    FOREIGN KEY(tid) REFERENCES teacher (tid)
)
CREATE TABLE score (
    sid INTEGER NOT NULL AUTO_INCREMENT,
    score SMALLINT NOT NULL COMMENT '成绩',
    student_id INTEGER COMMENT '成绩所属学生',
    course_id INTEGER COMMENT '成绩所属科目',
    PRIMARY KEY (sid),
    FOREIGN KEY(student_id) REFERENCES student (sid),
    FOREIGN KEY(course_id) REFERENCES course (cid)
)

"""

增删改查数据

插入数据

接上文 "在ORM中创建表" 中的表

1.x的接口与2.0的接口一样, 都是调用session.add(instance)方法添加到数据库 (add方法下次刷新操作时, 将instance保存到数据库)
注意: 自动生成的数据, 在未插入到数据库之前, 都为None, 如: 自动生成的主键

from sqlalchemy import Column, String, Integer, create_engine, SMALLINT, Boolean, ForeignKey
from sqlalchemy.orm import relationship, declarative_base, sessionmaker
from sqlalchemy.orm import Session
from typing import Any

# 导入公共基类
Base = declarative_base()
#  数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)


class Student(Base):
    __tablename__ = "student"
    sid = Column("sid", Integer, primary_key=True)
    name = Column("name", String(32), nullable=False, index=True, comment="姓名")
    age = Column("age", SMALLINT, nullable=False, comment="年龄")
    gender = Column("gender", Boolean, nullable=False, comment="性别, True: 男, False: 女")


class Course(Base):
    __tablename__ = "course"
    cid = Column("cid", Integer, primary_key=True)
    name = Column("name", String(10), nullable=False, comment="科目名")
    tid = Column("tid", ForeignKey("teacher.tid"), comment="课程教师")


class Teacher(Base):
    __tablename__ = "teacher"
    tid = Column("tid", Integer, primary_key=True)
    name = Column("name", String(10), nullable=False, comment="教师名")


class Score(Base):
    __tablename__ = "score"
    sid = Column("sid", Integer, primary_key=True)
    score = Column("score", SMALLINT, nullable=False, comment="成绩")
    student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
    course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")


Base.metadata.create_all(bind=engine)

SessionLocal = sessionmaker(bind=engine)


# 一般将 添加到数据库 封装成一个函数
def create_data(db: Session, target_cls: Any, **kwargs):
    try:
        cls_obj = target_cls(**kwargs)
        # 添加一个
        db.add(cls_obj)
        # 添加多个:
        # db.add_all([obj1, obj2, ...])

        db.commit()
        # 手动将 数据 刷新到数据库
        db.refresh(cls_obj)
        return cls_obj
    except Exception as e:
        # 别忘记发生错误时回滚
        db.rollback()
        raise e


session = SessionLocal()

# -------------- 创建学生数据
student = create_data(session, Student, sid=1, name="张三", age=22, gender=True)

# -------------- 创建教师数据
teacher = create_data(session, Teacher, tid=1, name="语文老师")

# -------------- 创建课程数据
course = create_data(session, Course, cid=1, name="语文", tid=teacher.tid)

# -------------- 创建成绩数据
score = create_data(session, Score, sid=1, score=89, student_id=student.sid, course_id=course.cid)

总的来说, 插入数据代码一般为:

# 1. 实例化一个表类
db_city = CityTable(....)

# 2. 调用session的add方法
session.add(db_city)

# 3. 调用session的commit方法 提交事务
session.commit()

# 4. 手动调用session的refresh方法 将数据刷新到数据库
session.refresh(db_city)

删除数据

1.x的方法

主要步骤是先查询再删除, 一般形式为: session.query(...).filter(...).delete()

from sqlalchemy import Column, Integer, create_engine, SMALLINT, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker

# 导入公共基类
Base = declarative_base()
#  数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)


class Score(Base):
    __tablename__ = "score"
    sid = Column("sid", Integer, primary_key=True)
    score = Column("score", SMALLINT, nullable=False, comment="成绩")
    student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
    course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")


Base.metadata.create_all(bind=engine)

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # 方法一: 调用 session.delete 方法
    s = session.query(Score).filter(Score.score == 59).first()
    session.delete(s)

    # 方法二: 查询后直接删除
    session.query(Score).filter(Score.score == 59).delete()
    session.commit()

2.0的方法

像Core一样删除数据, 即delte(...).where(...)

from sqlalchemy import Column, Integer, create_engine, SMALLINT, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import select, delete

# 导入公共基类
Base = declarative_base()
#  数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)


class Score(Base):
    __tablename__ = "score"
    sid = Column("sid", Integer, primary_key=True)
    score = Column("score", SMALLINT, nullable=False, comment="成绩")
    student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
    course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")


Base.metadata.create_all(bind=engine)

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    session.execute(
        delete(Score).where(Score.sid == 1)
    )
    session.commit()

修改数据

1.x的方法

主要步骤是先查询再更新, 即: session.query(...).filter(...).update(...)

from sqlalchemy import Column, Integer, create_engine, SMALLINT, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker

# 导入公共基类
Base = declarative_base()
#  数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)


class Score(Base):
    __tablename__ = "score"
    sid = Column("sid", Integer, primary_key=True)
    score = Column("score", SMALLINT, nullable=False, comment="成绩")
    student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
    course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")


Base.metadata.create_all(bind=engine)

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    row = session.query(Score).filter(Score.score == 59).update({"score": 60})

    print(f"修改的行数: {row}")
    session.commit()

2.0的方法

同样和Core一样, 使用update(...).where(...).values(...)的形式更新数据

from sqlalchemy import Column, Integer, create_engine, SMALLINT, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import update, bindparam

# 导入公共基类
Base = declarative_base()
#  数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)


class Score(Base):
    __tablename__ = "score"
    sid = Column("sid", Integer, primary_key=True)
    score = Column("score", SMALLINT, nullable=False, comment="成绩")
    student_id = Column("student_id", ForeignKey("student.sid"), comment="成绩所属学生")
    course_id = Column("course_id", ForeignKey("course.cid"), comment="成绩所属科目")


Base.metadata.create_all(bind=engine)

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # 一般更新
    result1 = session.execute(update(Score).where(Score.score == 59).values(score=60))
    print(f"受影响行数: {result1.rowcount}")

    # # 更新数据 加上 原来的数据
    result2 = session.execute(
        update(Score).where(Score.score == 59).values(score=Score.score + 1))
    print(f"受影响行数: {result2.rowcount}")  # 受影响行数: 1

    # 以字典的形式, 替换更新多个值
    result3 = session.execute(
        update(Score).where(Score.score == bindparam('old_score')).values(score=bindparam('new_score')),
        [
            {"old_score": 59, "new_score": 60},
        ]
    )

    print(f"受影响行数: {result3.rowcount}")

    session.commit()

查询数据

1.x的方法

在SQLAlchemy1.x查询方式中, 使用Query对象进行查询, 类似于Django ORM的管理器, 可以较为简单地查询数据
假如要查询的表如下:

class User(Base):
    __tablename__ = "User"  # 设置表名
    uid = Column(Integer, primary_key=True)
    username = Column(String(80), unique=True)
    email = Column(String(120), unique=True)
    tags = Column(String(120))

    def __repr__(self):
        return '<User %r>' % self.username

表的数据:

mysql> select * from User;
+-----+----------+-------------------+------+
| uid | username | email             | tags |
+-----+----------+-------------------+------+
|   1 | 张三     | [email protected]  | 热情 |
|   2 | 李四     | [email protected]       | 热情 |
|   3 | 王五     | [email protected]     | 开朗 |
|   4 | lczmx    | [email protected] | 热情 |
+-----+----------+-------------------+------+
4 rows in set (0.10 sec)

mysql>

使用例子:

from sqlalchemy import Column, Integer, create_engine, String
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import not_, or_, desc

# 导入公共基类
Base = declarative_base()
#  数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)


class User(Base):
    __tablename__ = "User"  # 设置表名
    uid = Column(Integer, primary_key=True)
    username = Column(String(80), unique=True)
    email = Column(String(120), unique=True)
    tags = Column(String(120))

    def __repr__(self):
        return '<User %r>' % self.username


Base.metadata.create_all(bind=engine)

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # ------------------  查询所有User数据
    session.query(User).all()
    """
    对应SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    """

    # ------------------  查询有多少条数据
    session.query(User).count()
    """
    对应SQL
    SELECT count(*) AS count_1 FROM (SELECT `User`.uid AS `User_uid`,
    `User`.username AS `User_username`, `User`.email AS `User_email`,
    `User`.tags AS `User_tags` FROM `User`) AS anon_1
    """

    # ------------------  查询第1条数据
    session.query(User).first()
    """
    对应SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    LIMIT 1
    """

    # ------------------  根据主键查询
    session.query(User).get(1)
    """
    对应SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE `User`.uid = 1
    """

    # ------------------  简单查询, 使用 关键字实参 的形式来设置字段名
    session.query(User).filter_by(uid=1).all()
    """
    对应SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE `User`.uid = 1
    """

    # ------------------  复杂查询, 可以多个表一起,使用 恒等式'==' 等形式 来设置条件
    session.query(User).filter(User.uid == 1).all()
    """
    对应SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE `User`.uid = 1
    """

    # ------------------  filter 查询开头
    session.query(User).filter(User.username.startswith("l")).all()
    """
    对应SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE (`User`.username LIKE concat('l', '%%'))
    """

    # ------------------  filter 查询结尾
    session.query(User).filter(User.username.endswith("x")).all()
    """
    对应SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE (`User`.username LIKE concat('%%', 'x'))
    """

    # ------------------  filter 查询是否包含
    session.query(User).filter(User.username.contains("lcz")).all()
    """
    对应SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE (`User`.username LIKE concat(concat('%%', "lcz", '%%')))
    """

    # ------------------  filter 模糊查询
    session.query(User).filter(User.username.like("%cz%")).all()
    """
    对应SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE `User`.username LIKE "%cz%"
    """

    # ------------------  filter 条件取反 (not)
    session.query(User).filter(not_(User.username == "lczmx")).all()
    """
    对应SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE `User`.username != "lczmx"
    """

    # ------------------  filter条件 或 (or), 默认为and
    session.query(User).filter(
        or_(User.uid == 1, User.uid == 3), ).all()
    """
    对应SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE `User`.uid = 1 OR `User`.uid = 3
    """

    # ------------------  filter条件 and or not 一起使用
    session.query(User).filter(or_(User.uid == 1, User.uid == 4), User.username == "lczmx",
                               not_(User.email == "[email protected]")).all()
    """
    对应SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE (`User`.uid = 1 OR `User`.uid = 4)
    AND `User`.username = "lczmx" AND `User`.email = "[email protected]"
    """

    # ------------------   filter 取反查询
    session.query(User).filter(User.username != "lczmx").all()
    """
    对应SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE `User`.username != "lczmx";
    """

    # ------------------  查询uid为[1, 3, 5, 7, 9]的用户
    session.query(User).filter(User.uid.in_([1, 3, 5, 7, 9])).all()
    """
    对应SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    WHERE `User`.uid IN (1, 3, 5, 7, 9)
    """

    # ------------------  分组查询
    # !! 注意不是query(User), 因为Query(User)对应的SQL为:
    # SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    # `User`.email AS `User_email`, `User`.tags AS `User_tags`
    session.query(User.tags).group_by(User.tags).all()
    """
    对应SQL
    SELECT `User`.tags AS `User_tags` FROM `User` GROUP BY `User`.tags
    """

    # ------------------  排序 顺序
    session.query(User).order_by(User.uid).all()
    """
    对应SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags`
    FROM `User` ORDER BY `User`.uid
    """

    # ------------------  排序 倒序
    session.query(User).order_by(desc(User.uid)).all()
    """
    对应SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    ORDER BY `User`.uid DESC
    """

    # ------------------ 去重
    session.query(User.tags).distinct().all()
    """
    对应SQL:
    SELECT DISTINCT `User`.tags AS `User_tags` FROM `User`;
    """

    # ------------------ 取几条数据
    session.query(User).limit(2).all()
    """
    对应SQL:
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    LIMIT 2;
    """

    # ------------------ 跳过几条个数据
    session.query(User).offset(1).limit(2).all()
    """
    对应SQL
    SELECT `User`.uid AS `User_uid`, `User`.username AS `User_username`,
    `User`.email AS `User_email`, `User`.tags AS `User_tags` FROM `User`
    LIMIT 1, 2;
    """

关于query返回的对象
query的返回对象为sqlalchemy.orm.query.Query对象, 你可以与Result对象进行对比, 主要有以下的方法:

  • all()
    返回由表对象组成的列表
  • first()
    返回第一个结果 (表对象), 内部执行limit SQL
  • one()
    只返回一行数据或引发异常 (无数据时抛出: sqlalchemy.exc.NoResultFound, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound)
  • one_or_none()
    最多返回一行数据或引发异常 (无数据时返回None, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound)
  • scalar()
    获取第一行的第一列数据. 如果没有要获取的行, 则返回None, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound

2.0的方法

2.0的返回结果也是Result对象, 关于Result对象, 见: Result

注意: 由于2.0的查询方式, Core和ORM都可以使用, 所以放在一起, 见下文: 查询数据详解

relationship连表操作

我们自定义外键时, 一般的步骤是:

  1. 子表使用字段名= Column(Integer, ForeignKey('主表名.主键'))的格式定义
  2. 除此外, 还需要在主表中定义relationship用于子表与主表之间的跨表查询

完整例子:

from sqlalchemy import Column, Integer, create_engine, String, Text, ForeignKey
from sqlalchemy.orm import declarative_base, sessionmaker, relationship

# 导入公共基类
Base = declarative_base()
#  数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)


class User(Base):
    __tablename__ = 'user'
    uid = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String(16), nullable=False)
    password = Column(String(32), nullable=False)
    # Article是类名 user是反向访问的属性名称
    article = relationship("Article", backref="user")
    """
    article = relationship("Article", backref="user")
    相当于:
    class User(Base):
        # Article是类名 user是反向访问的属性名称
        article = relationship("Article", back_populates="user")

    class Article(Base):
        # User是类名 addresses是反向访问的属性名称
        user = relationship("User", back_populates="addresses")

    """

    def __repr__(self):
        return "<User %s>" % self.username


class Article(Base):
    __tablename__ = 'article'
    aid = Column(Integer, primary_key=True, autoincrement=True)
    title = Column(String(36), nullable=False)
    content = Column(Text, nullable=False)
    author_id = Column(Integer, ForeignKey("user.uid"))

    def __repr__(self):
        return "<User %s>" % self.title


Base.metadata.create_all(bind=engine)

关于relationship
relationship在定义外键时, 有非常重要的作用, 如: 1. 跨表操作 2. 设置删除主表数据时子表的值
relationship的参数很多, 这里只列出常用的几个, 全部参数见文档: relationship

  • back_populates
    指定反向访问的属性名称

  • backref
    快捷设置两个relationship (设置back_populates的话, 要设置两个表)
    SQLAlchemy完全入门-LMLPHP

  • cascade
    用于控制修改数据时的选项

    比如:

    articles = relationship("Article",cascade="save-update,delete")
    
  • order_by
    子表列表的排序方式

    # 倒序
    article = relationship("Article", backref="user", order_by="Article.aid.desc()")
    # 正序
    article = relationship("Article", backref="user", order_by="Article.aid")
    

本部分包括Core与ORM的跨表增删改查操作

select `user`.uid as uid, `user`.username, `user`.password,
 (select GROUP_CONCAT(`article`.title) from article
   where `article`.author_id = `user`.uid) as article
 from user;

+-----+----------+----------+---------------+
| uid | username | password | article       |
+-----+----------+----------+---------------+
|   1 | 张三     | 12345    | C++入门,C入门 |
|   2 | 李四     | 12346    | python入门    |
+-----+----------+----------+---------------+

通过relationship双向访问

即直接通过relationship访问主表或子表

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # +++++++++++++++++++++++++ 子表访问主表, 返回主表对象
    """
    实质上内部执行的SQL
    SELECT user.uid AS user_uid, user.username AS user_username, user.password AS user_password
    FROM user
    WHERE user.uid = %(pk_1)s
    """
    # ----------- 1.x方法
    article_1x = session.query(Article).first()
    print(article_1x.user)  # <User 张三>
    # ----------- 2.0 方法
    article_20 = session.execute(select(Article)).first()
    print(article_20.Article.user)  # <User 张三>

    # +++++++++++++++++++++++++ 主表访问子表, 返回子表列表
    # 实质是 sqlalchemy.orm.collections.InstrumentedList 对象
    # 是List的子类
    """
    实质上内部执行的SQL
    SELECT article.aid AS article_aid, article.title AS article_title,
    article.content AS article_content, article.author_id AS article_author_id
    FROM article
    WHERE %(param_1)s = article.author_id ORDER BY article.aid
    """
    # ----------- 1.x方法
    user_1x = session.query(User).first()
    print(user_1x.article)
    # [<User C++入门>, <User C入门>]
    # ----------- 2.0方法
    user_20 = session.execute(select(User)).first()
    print(user_20.User.article)
    # [<User C++入门>, <User C入门>]

通过relationship修改关联关系

上面例子中说过, 主表.relationship字段InstrumentedList对象 (类似于List), 我们可以修改它, 然后调用commit方法即可. 子表.relationship字段是对应的主表, 可以修改为自己想要的, 同样调用commit方法即可

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # +++++++++++++++++++++++++ 子表修改属于的主表
    """
    对于SQL
    UPDATE article
    SET author_id=%(author_id)s
    WHERE article.aid = %(article_aid)s
    """
    # ----------- 1.x方法
    lisi = session.query(User).get(2)

    article_1x = session.query(Article).first()
    print(article_1x.user)  # <User 张三>
    article_1x.user = lisi  # 改为 <User 李四>
    session.commit()  # 记得提交
    # ----------- 2.0 方法
    zhangsan = session.execute(select(User).where(User.uid == 1)).first()

    article_20 = session.execute(select(Article)).first()
    print(article_20.Article.user)  # <User 李四>
    article_20.Article.user = zhangsan.User  # 改为 <User 张三>
    session.commit()  # 记得提交

    # +++++++++++++++++++++++++ 主表修改子表列表

    # ----------- 1.x方法 增加
    user_1x = session.query(User).first()
    # 添加一个新的子表数据
    # 会在Article中插入一条新的数据
    user_1x.article.append(Article(title="javascript 入门", content="console.log(hello world)"))
    session.commit()  # 记得提交

    # ----------- 2.0 方法 移除
    user_20 = session.execute(select(User)).first()
    print(user_20.User.article)  # [<User C++入门>, <User C入门>, <User javascript 入门>]

    article_js = session.execute(select(Article).where(Article.aid == 4)).scalar()
    # 从主表中移除与子表的关系
    user_20.User.article.remove(article_js)
    session.commit()  # 记得提交

通过relationship修改子/主表数据

同样非常简单, 找到对应的类, 然后修改数据并commit即可

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # +++++++++++++++++++++++++ 通过子表修改对应主表的数据
    # ----------- 1.x方法
    article_1x = session.query(Article).first()
    print(article_1x.user)  # <User 张三>

    article_1x.user.username = "张三二号"
    session.commit()  # 记得提交
    # ----------- 2.0 方法

    article_20 = session.execute(select(Article)).first()
    print(article_20.Article.user)  # <User 张三二号>

    article_20.Article.user.username = "张三"
    session.commit()  # 记得提交

    # +++++++++++++++++++++++++ 通过主表修改对应子表的数据
    # ----------- 1.x方法
    user_1x = session.query(User).first()
    print(user_1x.article)  # [<User C++入门>, <User C入门>, <User javascript 入门>]

    user_1x.article[-1].title = "js入门"
    session.commit()  # 记得提交
    # ----------- 2.0 方法
    user_20 = session.execute(select(User)).scalar()

    print(user_20.article)  # [<User C++入门>, <User C入门>, <User js入门>]

    user_20.article[-1].title = "javascript 入门"
    session.commit()  # 记得提交

通过relationship查询数据

正向查询, 子表利用主表的条件查询, 使用has, 条件和普通查询的条件一样
反向查询, 主表利用子表的条件查询, 使用any, 条件和普通查询的条件一样

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

SessionLocal = sessionmaker(bind=engine)

with SessionLocal() as session:
    # +++++++++++++++++++++++++ 反向查询
    """
    对应的SQL
    SELECT user.uid AS user_uid, user.username AS user_username, user.password AS user_password
    FROM user
    WHERE EXISTS (SELECT 1
    FROM article
    WHERE user.uid = article.author_id AND article.title LIKE 'python%')
    """
    # ********* 查询有以python开头的Article的User
    # ----------- 1.x方法
    user_1x = session.query(User).filter(User.article.any(Article.title.like("python%")))
    print(user_1x.all())  # [<User 李四>]

    # ----------- 2.0方法
    user_20 = session.execute(
        select(User).where(User.article.any(Article.title.like("python%"))))
    print(user_20.all())  # [(<User 李四>,)]

    # +++++++++++++++++++++++++ 正向查询
    """
    对应的SQL
    SELECT article.aid AS article_aid, article.title AS article_title,
     article.content AS article_content, article.author_id AS article_author_id
    FROM article
    WHERE EXISTS (SELECT 1
    FROM user
    WHERE user.uid = article.author_id AND (user.username LIKE concat(concat('%%', '四', '%%')))
    """
    # ********* 查询User表中username有 四 的Article
    # ----------- 1.x方法
    article_1x = session.query(Article).filter(Article.user.has(User.username.contains("四")))
    print(article_1x.all())  # [<User python入门>]

    # ----------- 2.0方法
    article_20 = session.execute(
        select(Article).where(Article.user.has(User.username.contains("四"))))
    print(article_20.all())  # [(<User python入门>,)]

建立多对多关系

可以通过relationship便捷使用多对多关系

from sqlalchemy import Table, Text, Column, ForeignKey

# 第三张表
post_keywords = Table("post_keywords", Base.metadata,
                      Column('post_id', ForeignKey('posts.id'), primary_key=True),
                      Column('keyword_id', ForeignKey('keywords.id'), primary_key=True)
                      )


class BlogPost(Base):
    __tablename__ = 'posts'

    id = Column(Integer, primary_key=True)
    headline = Column(String(255), nullable=False)
    body = Column(Text)

    # 多对多关系 BlogPost<->Keyword
    keywords = relationship('Keyword',
                            secondary=post_keywords,
                            back_populates='posts')

    def __init__(self, headline, body):
        self.headline = headline
        self.body = body

    def __repr__(self):
        return "BlogPost(%r, %r)" % (self.headline, self.body)


class Keyword(Base):
    __tablename__ = 'keywords'
    id = Column(Integer, primary_key=True)
    keyword = Column(String(50), nullable=False, unique=True)
    # 多对多关系 Keyword<->BlogPost
    posts = relationship('BlogPost',
                         secondary=post_keywords,
                         back_populates='keywords')

    def __init__(self, keyword):
        self.keyword = keyword


Base.metadata.create_all(bind=engine)

添加操作例子:

from sqlalchemy import select

SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
    # 比如添加一篇文章, 为文章添加多个keyword
    blog = BlogPost(headline="起飞!", body="我是文章的内容")
    # 获取子表列表 并 append
    blog.keywords.append(Keyword("新闻"))
    blog.keywords.append(Keyword("热门"))
    session.add(blog)
    session.commit()
    # ------- 添加第二篇文章
    keyword1 = session.execute(
        select(Keyword).filter_by(keyword="热门")
    ).scalar()
    new_blog = BlogPost(headline="震惊!", body="我是第二篇文章的内容")
    new_blog.keywords.append(keyword1)
    session.add(new_blog)
    session.commit()

查询操作例子:

from sqlalchemy import select

SessionLocal = sessionmaker(bind=engine)
with SessionLocal() as session:
    # 查询所有 "热门" 文章
    blog = session.execute(
        select(BlogPost).where(BlogPost.keywords.any(Keyword.keyword == "热门"))
    ).all()

    print(blog)
    # [(BlogPost('起飞!', '我是文章的内容'),),
    # (BlogPost('震惊!', '我是第二篇文章的内容'),)]

项目示范

一般来说, 我们使用SQLAlchemy的步骤大多相同, 下面
文件结构:

+--- database.py  # 用于 初始化session 和 公共基类
+--- models.py   # 定义表
+--- crud.py     # 封装增删改查的方法
+--- schemas.py  # 定义pydantic模型, 用于格式化已经取得的数据 [可选]
+--- main.py    # 执行主逻辑

database.py 初始化session和 公共基类:

from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, sessionmaker

# 数据库的URL
# 替换成自己的
SQLALCHEMY_DATABASE_URL = 'sqlite:///./coronavirus.sqlite3'

# 创建引擎
engine = create_engine(
    # echo=True表示引擎将用repr()函数记录所有语句及其参数列表到日志
    SQLALCHEMY_DATABASE_URL, encoding='utf-8', echo=True
)

# SessionLocal用于对数据的增删改查
# flush()是指发送数据库语句到数据库,但数据库不一定执行写入磁盘;commit()是指提交事务,将变更保存到数据库文件
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, expire_on_commit=True)

# 创建基本映射类, 用于创建表
Base = declarative_base(bind=engine, name='Base')

models.py 定义表结构

from sqlalchemy import Column, String, Integer, BigInteger, Date, DateTime, ForeignKey, func
from sqlalchemy.orm import relationship
# 导入公共基类
from .database import Base


class City(Base):
    __tablename__ = 'city'

    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    province = Column(String(100), unique=True, nullable=False, comment='省/直辖市')
    country = Column(String(100), nullable=False, comment='国家')
    country_code = Column(String(100), nullable=False, comment='国家代码')
    country_population = Column(BigInteger, nullable=False, comment='国家人口')
    data = relationship('Data', back_populates='city')  # 'Data'是关联的类名;back_populates来指定反向访问的属性名称

    created_at = Column(DateTime, server_default=func.now(), comment='创建时间')
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='更新时间')

    __mapper_args__ = {"order_by": country_code}  # 默认是正序,倒序加上.desc()方法

    def __repr__(self):
        return f'{self.country}_{self.province}'


class Data(Base):
    __tablename__ = 'data'

    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    city_id = Column(Integer, ForeignKey('city.id'), comment='所属省/直辖市')  # ForeignKey里的字符串格式不是类名.属性名,而是表名.字段名
    date = Column(Date, nullable=False, comment='数据日期')
    confirmed = Column(BigInteger, default=0, nullable=False, comment='确诊数量')
    deaths = Column(BigInteger, default=0, nullable=False, comment='死亡数量')
    recovered = Column(BigInteger, default=0, nullable=False, comment='痊愈数量')
    city = relationship('City', back_populates='data')  # 'City'是关联的类名;back_populates来指定反向访问的属性名称

    created_at = Column(DateTime, server_default=func.now(), comment='创建时间')
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='更新时间')

    __mapper_args__ = {"order_by": date.desc()}  # 按日期降序排列

    def __repr__(self):
        return f'{repr(self.date)}:确诊{self.confirmed}例'

crud.py 封装增删改查的方法:

from sqlalchemy.orm import Session

import models
import schemas


def get_city(db: Session, city_id: int):
    return db.query(models.City).filter(models.City.id == city_id).first()


def get_city_by_name(db: Session, name: str):
    return db.query(models.City).filter(models.City.province == name).first()


def get_cities(db: Session, skip: int = 0, limit: int = 10):
    return db.query(models.City).offset(skip).limit(limit).all()


def create_city(db: Session, city: schemas.CreateCity):
    """
    创建City数据
    """
    db_city = models.City(**city.dict())
    db.add(db_city)
    db.commit()
    db.refresh(db_city)
    return db_city


def get_data(db: Session, city: str = None, skip: int = 0, limit: int = 10):
    if city:
        # 外键关联查询,这里不是像Django ORM那样Data.city.province
        return db.query(models.Data).filter(models.Data.city.has(province=city))
    return db.query(models.Data).offset(skip).limit(limit).all()


def create_city_data(db: Session, data: schemas.CreateData, city_id: int):
    """
    创建Data数据
    """
    db_data = models.Data(**data.dict(), city_id=city_id)
    db.add(db_data)
    db.commit()
    db.refresh(db_data)
    return db_data

schemas.py 定义pydantic模型, 用于格式化已经取得的数据:

from datetime import date as date_
from datetime import datetime

from pydantic import BaseModel


class CreateData(BaseModel):
    date: date_
    confirmed: int = 0
    deaths: int = 0
    recovered: int = 0


class CreateCity(BaseModel):
    province: str
    country: str
    country_code: str
    country_population: int


class ReadData(CreateData):
    id: int
    city_id: int
    updated_at: datetime
    created_at: datetime

    class Config:
        orm_mode = True


class ReadCity(CreateCity):
    id: int
    updated_at: datetime
    created_at: datetime

    class Config:
        orm_mode = True

main.py 执行主逻辑:

from sqlalchemy.orm import Session

import crud
import schemas

from database import engine, Base, SessionLocal
from models import City, Data

# 创建表, 已经存在的将被忽略
Base.metadata.create_all(bind=engine)
db = SessionLocal()
# 调用 crud
db_city = crud.get_city_by_name(db, name="广东省")
if db_city:
    raise Exception("City already registered")

# 创建数据
city = City(...)
crud.create_city(db=db, city=city)

查询数据详解

只有涉及SQL 的查询数据, 必然绕不开FROM WHERE SELECT GROUP BY HAVING ORDER BY LIMIT INNER JOIN ... ON LEFT JOIN ... ON UNION 这些SQL查询语法, 那么它们在SQLAlchemy中是如何表示的呢? 实际上和SQL语句一样, SQLAlchemy的语法也有select where join order_by group_by having 等 ...

注意: 这些方法在Core与ORM中都适用

刚接触可能会觉得比较复杂, 但是假如有SQL基础的话, 用起来比较简单.
要查询的表结构为:

# +++++++++++++++++++ 使用Core定义 +++++++++++++++++++
from sqlalchemy import MetaData, create_engine
from sqlalchemy import Table, Column, Integer, String, ForeignKey, Text

#  数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}?charset=utf8".
                       format(**DATABASE_CONFIG), echo=True, future=True)
metadata_obj = MetaData()

user_table = Table(
    "user",
    metadata_obj,
    Column("uid", Integer, primary_key=True, autoincrement=True),
    Column("username", String(16), nullable=False),
    Column("password", String(32), nullable=False),
)

article_table = Table(
    'article',
    metadata_obj,
    Column("aid", Integer, primary_key=True, autoincrement=True),
    Column("title", String(36), nullable=False),
    Column("content", Text, nullable=False),
    Column("author_id", Integer, ForeignKey("user.uid"))
)

metadata_obj.create_all(bind=engine)


# +++++++++++++++++++ 使用ORM定义  +++++++++++++++++++
from sqlalchemy import Column, Integer, create_engine, String, Text, ForeignKey
from sqlalchemy.orm import declarative_base, relationship

# 导入公共基类
Base = declarative_base()
#  数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".
                       format(**DATABASE_CONFIG), echo=True, future=True)


class User(Base):
    __tablename__ = 'user'
    uid = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String(16), nullable=False)
    password = Column(String(32), nullable=False)

    # Article是类名 user是反向访问的属性名称
    article = relationship("Article", backref="user")

    def __repr__(self):
        return "<User %s>" % self.username


class Article(Base):
    __tablename__ = 'article'
    aid = Column(Integer, primary_key=True, autoincrement=True)
    title = Column(String(36), nullable=False)
    content = Column(Text, nullable=False)
    author_id = Column(Integer, ForeignKey("user.uid"))

    def __repr__(self):
        return "<User %s>" % self.title


Base.metadata.create_all(bind=engine)

表数据为:

mysql> select  * from article, user where user.uid=article.author_id;
+-----+------------+--------------------+-----------+-----+----------+----------+
| aid | title      | content            | author_id | uid | username | password |
+-----+------------+--------------------+-----------+-----+----------+----------+
|   1 | C++入门    | c++ hello world    |         1 |   1 | 张三     | 12345    |
|   2 | C入门      | c hello world      |         1 |   1 | 张三     | 12345    |
|   3 | python入门 | print(hello world) |         2 |   2 | 李四     | 12346    |
+-----+------------+--------------------+-----------+-----+----------+----------+
3 rows in set (0.12 sec)

select

该函数可以指定要查询的表、列及添加别名

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select


conn = engine.connect()  # engine 在上面定义

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

# 1 ++++++++++++++++++ 一般查询 ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password FROM user
WHERE user.uid = 2
"""
# -------------- Core
result_core_1 = conn.execute(select(user_table).where(user_table.c.uid == 2))
print(result_core_1.all())  # [(2, '李四', '12346')]
# -------------- ORM
result_orm_1 = session.execute(select(User).where(User.uid == 2))
print(result_orm_1.all())  # [(<User 李四>,)]

# 2 ++++++++++++++++++ 查询全部列 ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
"""
# -------------- Core
result_core_2 = conn.execute(select(user_table))
print(result_core_2.all())  # [(1, '张三', '12345'), (2, '李四', '12346')]
# -------------- ORM
result_orm_2 = session.execute(select(User))
print(result_orm_2.all())  # [(<User 张三>,), (<User 李四>,)]

# 3 ++++++++++++++++++ 查询指定的列 ++++++++++++++++++
"""
对应SQL
SELECT user.username
FROM user
"""
# -------------- Core
result_core_3 = conn.execute(select(user_table.c.username))
print(result_core_3.all())  # [('张三',), ('李四',)]
# -------------- ORM
result_orm_3 = session.execute(select(User.username))
print(result_orm_3.all())  # [('张三',), ('李四',)]

# 4 ++++++++++++++++++ 两个表 联合查询 ++++++++++++++++++
"""
对应SQL
SELECT user.username, article.title
FROM user, article
WHERE user.uid = article.author_id
"""
# -------------- Core
result_core_4 = conn.execute(
    select(user_table.c.username, article_table.c.title).where(
        user_table.c.uid == article_table.c.author_id))
# [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
print(result_core_4.all())
# -------------- ORM
result_orm_4 = session.execute(select(User.username, Article.title).where(
    User.uid == Article.author_id))
# [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
print(result_orm_4.all())

# 5 ++++++++++++++++++ 为列添加别名 ++++++++++++++++++
"""
对应SQL
SELECT user.username AS name
FROM user
WHERE user.uid = 1
"""
# -------------- Core
result_core_5 = conn.execute(
    select((user_table.c.username).label("name")).where(
        user_table.c.uid == 1))
# print(result_core_5.all())  # [('张三',)]
# -------------- ORM
result_orm_5 = session.execute(
    select((User.username).label("name")).where(
        User.uid == 1))
# print(result_orm_5.all())  # [('张三',)]

# 利用别名 取值
print(result_core_5.first().name)  # 张三
print(result_orm_5.first().name)  # 张三

# 6 ++++++++++++++++++ 为表添加别名 ++++++++++++++++++
"""
对应SQL
SELECT user_1.username
FROM user AS user_1
WHERE user_1.uid = 1
"""
# -------------- Core
user_table_core_alias = user_table.alias()
result_core_6 = conn.execute(
    select(user_table_core_alias.c.username).where(
        user_table_core_alias.c.uid == 1))
print(result_core_6.all())  # [('张三',)]
# -------------- ORM
from sqlalchemy.orm import aliased

user_table_orm_alias = aliased(User)
result_orm_6 = session.execute(
    select(user_table_orm_alias.username).where(
        user_table_orm_alias.uid == 1))
print(result_orm_6.all())  # [('张三',)]

# 7 ++++++++++++++++++ 与text 结合, 添加额外列 ++++++++++++++++++
"""
对应SQL
SELECT now() AS now, user.username, '自定义字符'
FROM user
"""

from sqlalchemy import literal_column, text

# literal_column表示一列数据
# text可以转化成SQL

# -------------- Core
result_core_7 = conn.execute(
    select(literal_column("now()").label("now"), user_table.c.username, text("'自定义字符'")))
print(result_core_7.all())
"""
[(datetime.datetime(2022, 1, 8, 18, 54, 44), '张三', '自定义字符'),
(datetime.datetime(2022, 1, 8, 18, 54, 44), '李四', '自定义字符')]
"""
# -------------- ORM
result_orm_7 = session.execute(
    select(literal_column("now()").label("now"), User.username, text("'自定义字符'")))
print(result_orm_7.all())
"""
[(datetime.datetime(2022, 1, 8, 18, 59, 42), '张三', '自定义字符'),
(datetime.datetime(2022, 1, 8, 18, 59, 42), '李四', '自定义字符')]
"""

where

过滤数据, Coretable.c.xxx获取行, 假如是ORM的话为:类名.属性名

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select


conn = engine.connect()  # engine 在上面定义

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

# 1 ++++++++++++++++++ 条件默认为and ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid < 3 AND user.uid > 1
"""
# -------------- Core
result_core_1 = conn.execute(
    select(user_table).where(user_table.c.uid < 3, user_table.c.uid > 1))
print(result_core_1.all())  # [(2, '李四', '12346')]
# -------------- ORM
result_orm_1 = session.execute(select(User).where(User.uid < 3, User.uid > 1))
print(result_orm_1.all())  # [(<User 李四>,)]

# 2 ++++++++++++++++++ 修改条件为not or ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE (user.uid = 1 OR user.uid = 2) AND user.username != "李四"
"""
from sqlalchemy import or_, not_

# -------------- Core
result_core_2 = conn.execute(
    select(user_table).where(
        or_(user_table.c.uid == 1, user_table.c.uid == 2),
        not_(user_table.c.username == "李四"),
    ))
print(result_core_2.all())  # [(1, '张三', '12345')]
# -------------- ORM
result_orm_2 = session.execute(select(User).where(
    or_(User.uid == 1, User.uid == 2),
    not_(User.username == "李四")))
print(result_orm_2.all())  # [(<User 张三>,)]

# 3 ++++++++++++++++++ startswith ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE (user.username LIKE concat('张', '%%'))
"""

# -------------- Core
result_core_3 = conn.execute(
    select(user_table).where(
        user_table.c.username.startswith("张")))
print(result_core_3.all())  # [(1, '张三', '12345')]
# -------------- ORM
result_orm_3 = session.execute(select(User).where(
    User.username.startswith("张")))
print(result_orm_3.all())  # [(<User 张三>,)]

# 4 ++++++++++++++++++ endswith ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE (user.username LIKE concat('%%', '三'))
"""

# -------------- Core
result_core_4 = conn.execute(
    select(user_table).where(
        user_table.c.username.endswith("三")))
print(result_core_4.all())  # [(1, '张三', '12345')]
# -------------- ORM
result_orm_4 = session.execute(select(User).where(
    User.username.endswith("三")))
print(result_orm_4.all())  # [(<User 张三>,)]

# 5 ++++++++++++++++++ endswith ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE (user.username LIKE concat(concat('%%', '三', '%%'))
"""

# -------------- Core
result_core_5 = conn.execute(
    select(user_table).where(
        user_table.c.username.contains("三")))
print(result_core_5.all())  # [(1, '张三', '12345')]
# -------------- ORM
result_orm_5 = session.execute(select(User).where(
    User.username.contains("三")))
print(result_orm_5.all())  # [(<User 张三>,)]

# 6 ++++++++++++++++++ like ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.username LIKE '%三'
"""

# -------------- Core
result_core_6 = conn.execute(
    select(user_table).where(
        user_table.c.username.like("%三")))
print(result_core_6.all())  # [(1, '张三', '12345')]
# -------------- ORM
result_orm_6 = session.execute(select(User).where(
    User.username.like("%三")))
print(result_orm_6.all())  # [(<User 张三>,)]

# 7 ++++++++++++++++++ in ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid IN (1, 2)
"""

# -------------- Core
result_core_7 = conn.execute(
    select(user_table).where(
        user_table.c.uid.in_((1, 2))
    ))
print(result_core_7.all())  # [(1, '张三', '12345'), (2, '李四', '12346')]
# -------------- ORM
result_orm_7 = session.execute(
    select(User).where(
        User.uid.in_((1, 2))
    ))
print(result_orm_7.all())  # [(<User 张三>,), (<User 李四>,)]

# 8 ++++++++++++++++++ between ... and ... ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid BETWEEN 1 AND 2
"""

# -------------- Core
result_core_8 = conn.execute(
    select(user_table).where(
        user_table.c.uid.between(1, 2)
    ))
print(result_core_8.all())  # [(1, '张三', '12345'), (2, '李四', '12346')]
# -------------- ORM
result_orm_8 = session.execute(
    select(User).where(
        User.uid.between(1, 2)
    ))
print(result_orm_8.all())  # [(<User 张三>,), (<User 李四>,)]

# 9 ++++++++++++++++++ is null 或 is not null ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid IS NOT NULL
"""

from sqlalchemy import not_

# -------------- Core
result_core_9 = conn.execute(
    select(user_table).where(
        # user_table.c.uid.is_(None),  # IS NULL
        not_(user_table.c.uid.is_(None)),  # IS NOT NULL
    ))
print(result_core_9.all())  # [(1, '张三', '12345'), (2, '李四', '12346')]
# -------------- ORM
result_orm_9 = session.execute(
    select(User).where(
        # User.uid.is_(None),  # IS NULL
        not_(User.uid.is_(None)),  # IS NOT NULL
    ))
print(result_orm_9.all())  # [(<User 张三>,), (<User 李四>,)]

除此之外, 你还可以使用一些其他运算符, 详情见: Operator Reference

order_by

order_by用于排序

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select


conn = engine.connect()  # engine 在上面定义

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

# 1 ++++++++++++++++++ 根据某一列排序 (默认升序) ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user ORDER BY user.uid
"""

# -------------- Core
result_core_1 = conn.execute(
    select(user_table).order_by(user_table.c.uid))
print(result_core_1.all())  # [(1, '张三', '12345'), (2, '李四', '12346')]
# -------------- ORM
result_orm_1 = session.execute(
    select(User).order_by(User.uid))
print(result_orm_1.all())  # [(<User 张三>,), (<User 李四>,)]

# 2 ++++++++++++++++++ 手动指定升序/降序 ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user ORDER BY user.uid ASC/DESC
"""

# -------------- Core
result_core_2 = conn.execute(
    # select(user_table).order_by(user_table.c.uid.asc())  # 升序
    select(user_table).order_by(user_table.c.uid.desc())  # 降序
)
print(result_core_2.all())
# [(1, '张三', '12345'), (2, '李四', '12346')] / [(2, '李四', '12346'), (1, '张三', '12345')]
# -------------- ORM
result_orm_2 = session.execute(
    # select(User).order_by(User.uid.asc())  # 升序
    select(User).order_by(User.uid.desc())  # 降序
)
print(result_orm_2.all())
# [(<User 张三>,), (<User 李四>,)] / [(<User 李四>,), (<User 张三>,)]

# 3 ++++++++++++++++++ 根据别名排序 ++++++++++++++++++
"""
对应SQL
SELECT user.username AS name
FROM user ORDER BY name DESC
"""
from sqlalchemy import desc, asc

# -------------- Core
result_core_3 = conn.execute(
    select((user_table.c.username).label("name")).order_by(desc("name"))
)
print(result_core_3.all())  # [('张三',), ('李四',)]

# -------------- ORM
result_orm_3 = session.execute(
    select((User.username).label("name")).order_by(desc("name"))
)
print(result_orm_3.all())  # [('张三',), ('李四',)]

group_by和having

group_by用于分组, having类似于where, 但可以对已分组数据使用聚合函数

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select, func


conn = engine.connect()  # engine 在上面定义

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

# 1 ++++++++++++++++++ group_by一般使用 ++++++++++++++++++
"""
对应SQL
SELECT count(article.author_id) AS count
FROM article GROUP BY article.author_id
"""

# func 内有很多内置函数

# -------------- Core
result_core_1 = conn.execute(
    select(func.count(article_table.c.author_id).label("count")).group_by(article_table.c.author_id)
)
print(result_core_1.all())  # [(2,), (1,)]

# -------------- ORM
result_orm_1 = session.execute(
    select(func.count(Article.author_id).label("count")).group_by(Article.author_id)
)
print(result_orm_1.all())  # [(2,), (1,)]

# 2 ++++++++++++++++++ group by + having ++++++++++++++++++
"""
对应SQL
SELECT count(article.author_id) AS count
FROM article GROUP BY article.author_id
HAVING count(article.author_id) > 1
"""

# func 内有很多内置函数

# -------------- Core
result_core_2 = conn.execute(
    select(func.count(article_table.c.author_id).label("count")).group_by(
        article_table.c.author_id).having(func.count(article_table.c.author_id) > 1)
)
print(result_core_2.all())  # [(2,)]

# -------------- ORM
result_orm_2 = session.execute(
    select(func.count(Article.author_id).label("count")).group_by(
        Article.author_id).having(func.count(Article.author_id) > 1)
)
print(result_orm_2.all())  # [(2,)]

limit和offset

limit: 表示取几条数据, offset: 表示要跳过多少条数据

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select


conn = engine.connect()  # engine 在上面定义

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

# 1 ++++++++++++++++++ 仅使用LIMIT ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
LIMIT 1
"""

# -------------- Core
result_core_1 = conn.execute(select(user_table).limit(1))
print(result_core_1.all())  # [(1, '张三', '12345')]

# -------------- ORM
result_orm_1 = session.execute(select(User).limit(1))
print(result_orm_1.all())  # [(<User 张三>,)]

# 2 ++++++++++++++++++ 使用LIMIT和OFFSET ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
LIMIT 1, 1
"""

# -------------- Core
result_core_2 = conn.execute(select(user_table).limit(1).offset(1))
print(result_core_2.all())  # [(2, '李四', '12346')]

# -------------- ORM
result_orm_2 = session.execute(select(User).limit(1).offset(1))
print(result_orm_2.all())  # [(<User 李四>,)]

去重

你可以在查询的时候使用SQL进行去重, 亦可以在取到数据后进行去重

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select

conn = engine.connect()  # engine 在上面定义

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()
# 1 ++++++++++++++++++ 在SQL中去重 ++++++++++++++++++
"""
对应SQL
SELECT DISTINCT article.author_id
FROM article
"""

# -------------- Core
result_core_1 = conn.execute(
    select(article_table.c.author_id).distinct()
)
print(result_core_1.all())  # [(1,), (2,)]
# -------------- ORM
result_orm_1 = session.execute(
    select(Article.author_id).distinct())
print(result_orm_1.all())  # [(1,), (2,)]

# 2 ++++++++++++++++++ 在结果中去重 ++++++++++++++++++
"""
对应SQL
SELECT article.author_id
FROM article
"""

# -------------- Core
result_core_2 = conn.execute(select(article_table.c.author_id))
print(result_core_2.unique().all())  # [(1,), (2,)]

# -------------- ORM
result_orm_2 = session.execute(select(Article.author_id))
print(result_orm_2.unique().all())  # [(1,), (2,)]

连接查询

内连接查询 外连接查询 完全连接查询三种连接查询方式, 在SQLAlchemy中分别对应着join(...) join(..., isouter=True) join(..., full=True)

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select


conn = engine.connect()  # engine 在上面定义

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

# 1 ++++++++++++++++++ 内连接查询 (自动推断join的表) 方式一 ++++++++++++++++++
"""
对应SQL
SELECT user.username, article.title
FROM user INNER JOIN article ON user.uid = article.author_id
"""

# -------------- Core
result_core_1 = conn.execute(
    select(user_table.c.username, article_table.c.title).join_from(user_table, article_table))
print(result_core_1.all())  # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
# -------------- ORM
result_orm_1 = session.execute(
    select(User.username, Article.title).join_from(User, Article))
print(result_orm_1.all())  # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]

# 2 ++++++++++++++++++ 内连接查询 (自动推断join的表) 方式二 ++++++++++++++++++
"""
对应SQL
SELECT user.username, article.title
FROM user INNER JOIN article ON user.uid = article.author_id
"""

# -------------- Core
result_core_2 = conn.execute(
    select(user_table.c.username, article_table.c.title).join(article_table))
print(result_core_2.all())  # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
# -------------- ORM
result_orm_2 = session.execute(select(User.username, Article.title).join(Article))
print(result_orm_2.all())  # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]

# 3 ++++++++++++++++++ 内连接查询 (手动指定join的表) ++++++++++++++++++
"""
对应SQL
SELECT user.username, article.title
FROM user INNER JOIN article ON user.uid = article.author_id
"""

# -------------- Core
result_core_3 = conn.execute(
    select(user_table.c.username, article_table.c.title).select_from(user_table).join(article_table))
print(result_core_3.all())  # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
# -------------- ORM
result_orm_3 = session.execute(
    select(User.username, Article.title).select_from(User).join(Article))
print(result_orm_3.all())  # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]

# 4 ++++++++++++++++++ 内连接查询 (手动指定on的条件) ++++++++++++++++++
"""
对应SQL
SELECT user.username, article.title
FROM user INNER JOIN article ON user.uid = article.author_id
"""

# -------------- Core
result_core_4 = conn.execute(
    select(user_table.c.username, article_table.c.title).select_from(
        user_table).join(article_table, user_table.c.uid == article_table.c.author_id))
print(result_core_4.all())  # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]
# -------------- ORM
result_orm_4 = session.execute(
    select(User.username, Article.title).select_from(
        User).join(Article, User.uid == Article.author_id))
print(result_orm_4.all())  # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]

# 5 ++++++++++++++++++ 外连接查询 ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password, article.title
FROM user LEFT OUTER JOIN article ON user.uid = article.author_id
"""

# -------------- Core
result_core_5 = conn.execute(
    select(user_table, article_table.c.title).join(article_table, isouter=True))
print(result_core_5.all())
# [(1, '张三', '12345', 'C入门'), (1, '张三', '12345', 'C++入门'), (2, '李四', '12346', 'python入门')]
# -------------- ORM
result_orm_5 = session.execute(
    select(User, Article.title).join(Article, isouter=True))
print(result_orm_5.all())  # [(<User 张三>, 'C入门'), (<User 张三>, 'C++入门'), (<User 李四>, 'python入门')]

# 6 ++++++++++++++++++ 完全连接查询 ++++++++++++++++++
# # !!! 注意: MYSQL 中没有  FULL OUTER JOIN, 执行时会报错
"""
对应SQL
SELECT user.uid, user.username, user.password, article.title
FROM user LEFT OUTER JOIN article ON user.uid = article.author_id
"""

# -------------- Core
result_core_6 = conn.execute(
    select(user_table, article_table.c.title).join(article_table, full=True))
print(result_core_6.all())

# -------------- ORM
result_orm_6 = session.execute(
    select(User, Article.title).join(Article, full=True))
print(result_orm_6.all())

比如:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select


conn = engine.connect()  # engine 在上面定义

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

"""
对应SQL
SELECT user.username, article.title
FROM user INNER JOIN article ON user.uid = article.author_id
"""

result_orm = session.execute(
    select(User.username, Article.title).select_from(User).join(User.article))
print(result_orm.all())  # [('张三', 'C++入门'), ('张三', 'C入门'), ('李四', 'python入门')]

UNION和UNION ALL

UNION ALL: 合并两个或多个SELECT语句的结果, 结果不会去重
UNION: 合并两个或多个SELECT语句的结果, 结果会去重

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select, union_all, union


conn = engine.connect()  # engine 在上面定义

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

# 1 ++++++++++++++++++ UNION ALL ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid = 1
UNION ALL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid = 2
"""

# -------------- Core
result_core_1_stmt1 = select(user_table).where(user_table.c.uid == 1)
result_core_1_stmt2 = select(user_table).where(user_table.c.uid == 2)
result_core_1_u = union_all(result_core_1_stmt1, result_core_1_stmt2)

result_core_1 = conn.execute(result_core_1_u)
print(result_core_1.all())  # [(1, '张三', '12345'), (2, '李四', '12346')]

# -------------- ORM
result_orm_1_stmt1 = select(user_table).where(user_table.c.uid == 1)
result_orm_1_stmt2 = select(user_table).where(user_table.c.uid == 2)
result_orm_1_u = union_all(result_orm_1_stmt1, result_orm_1_stmt2)

result_orm_1 = session.execute(result_orm_1_u)
print(result_orm_1.all())  # [(1, '张三', '12345'), (2, '李四', '12346')]

# 2 ++++++++++++++++++ UNION ++++++++++++++++++
"""
对应SQL
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid = 1
UNION
SELECT user.uid, user.username, user.password
FROM user
WHERE user.uid = 1
"""

# -------------- Core
result_core_2_stmt1 = select(user_table).where(user_table.c.uid == 1)
result_core_2_stmt2 = select(user_table).where(user_table.c.uid == 1)
result_core_2_u = union(result_core_2_stmt1, result_core_2_stmt2)

result_core_2 = conn.execute(result_core_2_u)
print(result_core_2.all())  # [(1, '张三', '12345')]

# -------------- ORM
result_orm_2_stmt1 = select(user_table).where(user_table.c.uid == 1)
result_orm_2_stmt2 = select(user_table).where(user_table.c.uid == 1)
result_orm_2_u = union(result_orm_2_stmt1, result_orm_2_stmt2)

result_orm_2 = session.execute(result_orm_2_u)
print(result_orm_2.all())  # [(1, '张三', '12345')]

子查询

即形如: SELECT * FROM data WHERE name IN (SELECT name FROM user);
SELECT * FROM data WHERE EXISTS (SELECT name FROM user);的查询
或使用WITH temp AS (...) 作为临时表
注意: 在使用子查询后,SQL语句的查询性能变得非常糟糕, 至于如何取舍看个人权衡了

from sqlalchemy.orm import sessionmaker
from sqlalchemy import select, func


conn = engine.connect()  # engine 在上面定义

SessionLocal = sessionmaker(bind=engine)
session = SessionLocal()

# 1 ++++++++++++++++++ EXISTS 子查询 ++++++++++++++++++
"""
对应SQL
SELECT user.username
FROM user
WHERE EXISTS (SELECT count(article.author_id) AS count_1
FROM article GROUP BY article.author_id
HAVING count(article.author_id) >= 2)
"""

# -------------- Core
subquery_core_1 = (select(func.count(article_table.c.author_id)).group_by(
    article_table.c.author_id).having(
    func.count(article_table.c.author_id) >= 2)).exists()

result_core_1 = conn.execute(select(user_table.c.username).where(subquery_core_1))
print(result_core_1.all())  # [('张三',), ('李四',)]

# -------------- ORM
subquery_orm_1 = (select(func.count(Article.author_id)).group_by(
    Article.author_id).having(
    func.count(Article.author_id) >= 2)).exists()

result_orm_1 = session.execute(select(User.username).where(subquery_orm_1))
print(result_orm_1.all())  # [('张三',), ('李四',)]

# 2 ++++++++++++++++++ 其它 子查询 ++++++++++++++++++
"""
对应SQL
SELECT article.title, anon_1.username
FROM article, (SELECT user.uid AS uid, user.username AS username
FROM user
WHERE user.uid = 1) AS anon_1
WHERE article.author_id = anon_1.uid
"""

# -------------- Core
subquery_core_2 = select(user_table.c.uid, user_table.c.username).where(
    user_table.c.uid == 1).subquery()

result_core_2 = conn.execute(
    select(article_table.c.title, subquery_core_2.c.username).where(
        article_table.c.author_id == subquery_core_2.c.uid))
print(result_core_2.all())  # [('C++入门', '张三'), ('C入门', '张三')]

# # -------------- ORM
subquery_orm_2 = select(User.uid, User.username).where(User.uid == 1).subquery()

result_orm_2 = session.execute(
    select(Article.title, subquery_orm_2.c.username).where(
        Article.author_id == subquery_orm_2.c.uid))
print(result_orm_2.all())  # [('C++入门', '张三'), ('C入门', '张三')]

# 3 ++++++++++++++++++ with 添加临时表 ++++++++++++++++++
"""
对应SQL
WITH anon_1 AS
(SELECT user.uid AS uid, user.username AS username
FROM user
WHERE user.uid = 1)
 SELECT article.title, anon_1.username
FROM article, anon_1
WHERE article.author_id = anon_1.uid
"""

# -------------- Core
subquery_core_3 = select(user_table.c.uid, user_table.c.username).where(
    user_table.c.uid == 1).cte()

result_core_3 = conn.execute(
    select(article_table.c.title, subquery_core_3.c.username).where(
        article_table.c.author_id == subquery_core_3.c.uid))
print(result_core_3.all())  # [('C++入门', '张三'), ('C入门', '张三')]

# -------------- ORM
subquery_orm_3 = select(User.uid, User.username).where(User.uid == 1).cte()

result_orm_3 = session.execute(
    select(Article.title, subquery_orm_3.c.username).where(
        Article.author_id == subquery_orm_3.c.uid))
print(result_orm_3.all())  # [('C++入门', '张三'), ('C入门', '张三')]

从1.x迁移到2.0的接口

一些Query的接口用起来还是特别好用的, 因此在2.0风格的接口中, 一些接口仍然可以使用

get根据主键查询

# ---------------- 1.x
session.query(User).get(42)
# ---------------- 2.0
session.get(User, 42)

filter_by简单查询

result = session.execute(
    select(User).filter_by(username="张三")
)
print(result.all())  # [(<User 张三>,)]

filter复杂查询

result = session.execute(
    select(User).filter(User.username == "张三")
)
print(result.all())  # [(<User 张三>,)]

1.x与2.0的ORM接口对比

以下表格来自于: 2.0 Migration - ORM Usage
SQLAlchemy完全入门-LMLPHP
SQLAlchemy完全入门-LMLPHP

一些类的介绍

Result

表示从数据库中返回的结果, 一行数据使用Row对象表示, 关于Row, 见: Row

from sqlalchemy import create_engine, text
from sqlalchemy.orm import declarative_base, sessionmaker

# 导入公共基类
Base = declarative_base()
# 数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)

with Session() as session:
    session_res = session.execute(
        text("select tid, name from teacher;")
    )
    # <class 'sqlalchemy.engine.cursor.CursorResult'>
    print(type(session_res))

with engine.connect() as conn:
    conn_res = conn.execute(
        text("select tid, name from teacher;")
    )
    # <class 'sqlalchemy.engine.cursor.CursorResult'>
    print(type(conn_res))

注意: 例子中的teacher表的数据为:

+-----+----------+
| tid | name     |
+-----+----------+
|   1 | 语文老师 |
|   2 | 英语老师 |
|   3 | 数学老师 |
+-----+----------+

Result的全部方法

  • unique(strategy=None)
    去重, 但需要注意何时调用, 应该在调用如.all()这种生成Row的方法之前调用, 否则返回的对象都没有unique这个方法

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 导入公共基类
    Base = declarative_base()
    #  数据库配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 连接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res1 = session.execute(
            text("select tid, name from teacher;")
        )
        session_res2 = session.execute(
            text("select tid, name from teacher;")
        )
    
        session_res = session_res1.merge(session_res2)
    
        # [1, 2, 3, 1, 2, 3] ==> [1, 2, 3]
        print(session_res.scalars().unique().all())
    
    
  • all
    返回所有Row数据的列表, 之后的调用将返回一个空列表

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 导入公共基类
    Base = declarative_base()
    #  数据库配置
    DATABASE_CONFIG = {
       "username": "root",
       "password": "123456",
       "host": "localhost",
       "database": "test"
    }
    # 连接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                          echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
       session_res = session.execute(
           text("select tid, name from teacher;")
       )
    
       # ---------------------------------- 第一次调用all
       first_all = session_res.all()
       print(type(session_res.all()))  # <class 'list'>
       for row in first_all:
           print(type(row))  # <class 'sqlalchemy.engine.row.Row'>
           print(f"{row.tid}-{row.name}")
    
       # ---------------------------------- 第二次调用all
       print(session_res.all())  # []
    
    
  • fetchall()
    all方法一样

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 导入公共基类
    Base = declarative_base()
    #  数据库配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 连接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res = session.execute(
            text("select tid, name from teacher;")
        )
        first_fetchall = session_res.fetchall()
        second_fetchall = session_res.fetchall()
        # [(1, '语文老师'), (2, '英语老师'), (3, '数学老师')]
        print(first_fetchall)
        # []
        print(second_fetchall)
    
    
  • fetchmany(size=None)
    取多行数据, size表示取多少行数据 , 当所有行都用完时, 返回一个空列表

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 导入公共基类
    Base = declarative_base()
    #  数据库配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 连接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res = session.execute(
            text("select tid, name from teacher;")
        )
        # 一共3行数据
    
        # 取两行数据: [(1, '语文老师'), (2, '英语老师')]
        print(session_res.fetchmany(2))
    
        # 取一行数据: [(3, '数学老师')]
        print(session_res.fetchmany(1))
    
        # 没有数据了, 返回空列表: []
        print(session_res.fetchmany(1))
    
  • fetchone()
    取一行数据, 当所有行都用完时,返回None

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 导入公共基类
    Base = declarative_base()
    #  数据库配置
    DATABASE_CONFIG = {
       "username": "root",
       "password": "123456",
       "host": "localhost",
       "database": "test"
    }
    # 连接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                          echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
       session_res = session.execute(
           text("select tid, name from teacher;")
       )
       while True:
           row = session_res.fetchone()
           if not row:
               break
           print(row)
           """
           (1, '语文老师')
           (2, '英语老师')
           (3, '数学老师')
           """
    
    
  • first()
    获取第一行数据,关闭Result并丢弃其余行, 如果没有行,则不获取 (即返回值为None)

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 导入公共基类
    Base = declarative_base()
    #  数据库配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 连接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res = session.execute(
            text("select tid, name from teacher;")
        )
    
        # (1, '语文老师')
        print(session_res.first())
    
        # !!! 由于Result已经关闭, 继续操作会报错:
        # sqlalchemy.exc.ResourceClosedError: This result object is closed.
        print(session_res.first())
    
    
  • one()
    只返回一行数据或引发异常, 并关闭Result (无数据时抛出: sqlalchemy.exc.NoResultFound, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound)

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 导入公共基类
    Base = declarative_base()
    #  数据库配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 连接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res = session.execute(
            text("select tid, name from teacher where tid=1;")
        )
    
        # (1, '语文老师')
        print(session_res.one())
    
    
  • one_or_none()
    最多返回一行数据或引发异常, 并关闭Result (无数据时返回None, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound)

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 导入公共基类
    Base = declarative_base()
    #  数据库配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 连接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res1 = session.execute(
            text("select tid, name from teacher where tid < 1;")
        )
        # None
        print(session_res1.one_or_none())
    
        session_res2 = session.execute(
            text("select tid, name from teacher where tid = 1;")
        )
        # (1, '语文老师')
        print(session_res2.one_or_none())
    
    
  • columns(*col_expressions)
    限制返回列, 也可以对列进行重新排序
    即假如结果的列为(a, b, c, d), 但我只需要a和b, 那么只需要result.columns("a", "b"), 你还可以调整它们的顺序, 以方便解包
    注意: 这会修改Result的列, 且该方法的返回值就是修改后的Result对象

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 导入公共基类
    Base = declarative_base()
    #  数据库配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 连接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res1 = session.execute(
            text("select tid, name from teacher;")
        )
        session_res2 = session.execute(
            text("select tid, name from teacher;")
        )
    
        for row in session_res1.columns("tid"):
            # 返回值时修改后的Result
    
            # 获取字段名元组
            print(row._fields)  # ('tid',)
    
        # 会修改原Result
        session_res2.columns("name")
        for row in session_res2:
            # 获取字段名元组
            print(row._fields)  # ('name',)
    
    
  • scalar()
    获取第一行的第一列数据, 并关闭Result. 如果没有要获取的行, 则返回None
    如: [(1, "lczmx"), (2, "jack")], 返回 1

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 导入公共基类
    Base = declarative_base()
    #  数据库配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 连接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res = session.execute(
            text("select tid, name from teacher;")
        )
        print(session_res.scalar())  # 1
    
  • scalar_one()
    只返回一行数据的第一列或引发异常, 并关闭Result (无数据时抛出: sqlalchemy.exc.NoResultFound, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound)
    Result.one() + Result.scalar()

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 导入公共基类
    Base = declarative_base()
    #  数据库配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 连接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res = session.execute(
            text("select tid, name from teacher where tid=1;")
        )
    
        print(session_res.scalar_one())  # 1
    
    
  • scalar_one_or_none()
    最多返回一行数据的第一列或引发异常, 并关闭Result (无数据时返回None, 多行数据时抛出: sqlalchemy.exc.MultipleResultsFound)
    Result.one_or_none() + Result.scalar()

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 导入公共基类
    Base = declarative_base()
    #  数据库配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 连接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res1 = session.execute(
            text("select tid, name from teacher where tid<1;")
        )
    
        print(session_res1.scalar_one_or_none())  # None
    
        session_res2 = session.execute(
            text("select tid, name from teacher where tid=1;")
        )
    
        print(session_res2.scalar_one_or_none())  # 1
    
    
  • scalars(index=0)
    返回一个ScalarResult对象, 该对象以每行数据的 第index列 元素作为数据 (而不是ResultRow)
    该对象的方法有: unique partitions fetchall fetchmany all first one_or_none one
    具体使用与Result类似

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 导入公共基类
    Base = declarative_base()
    #  数据库配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 连接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res = session.execute(
            text("select tid, name from teacher;")
        )
    
        # [1, 2, 3]
        print(session_res.scalars().all())
    
    
  • mappings()
    返回一个MappingResult对象, MappingResult对象与Result对象类似, 但是一行数据使用RowMapping对象表示, RowMapping对象类似于字典对象, 简而言之: 调用该方法, 你可以将一行数据由类元组变为类字典

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 导入公共基类
    Base = declarative_base()
    #  数据库配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 连接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res = session.execute(
            text("select tid, name from teacher;")
        )
    
        for d in session_res.mappings():
            # 像操作字典一样操作 d 即可
            print(d.get("tid"), d.get("name"))
    
    
  • keys()
    从SQLAlchemy1.4起 (之前的版本返回一个列表), 该方法将返回一个RMKeyView对象, 该对象可迭代, 其_keys属性存放列的名称, 由于实现的__contains__方法, 因此也可以使用in运算符作判断.

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    from collections import Iterable
    
    # 导入公共基类
    Base = declarative_base()
    #  数据库配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 连接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res = session.execute(
            text("select tid, name from teacher;")
        )
    
        keys = session_res.keys()
        print(keys)  # RMKeyView(['tid', 'name'])
    
        # <class 'sqlalchemy.engine.result.RMKeyView'>
        print(type(keys))
    
        # 可迭代的
        print(isinstance(keys, Iterable))  # True
    
        if "name" in keys:
            print("name in keys")
    
    
  • freeze()
    可以对Result进行缓存, 见官方文档: Re-Executing Statements

  • merge(*others)
    该方法合并其他Result, 返回一个MergedResult对象, 你可以像一般的Result一样操作它, 但是取值的时候注意游标的位置(MergedResult关闭, Result也关闭; MergedResult取完了值, Result的值也被取完)

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 导入公共基类
    Base = declarative_base()
    #  数据库配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 连接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res1 = session.execute(
            text("select tid, name from teacher where tid=1;")
        )
        session_res2 = session.execute(
            text("select tid, name from teacher where tid=2;")
        )
    
        session_res = session_res2.merge(session_res1)
    
        # 注意 先session_res2再session_res1
    
        # (2, '英语老师')
        print(session_res.fetchone())
    
        # [(1, '语文老师')]
        print(session_res1.all())
    
        # session_res已经取过一次
        # 所以返回: []
        print(session_res2.all())
    
    
  • partitions(size=None)
    迭代生成size大小的行的子列表, sizeNone时调用Result.yield_per(), 否则调用Result.fetchmany(size)

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 导入公共基类
    Base = declarative_base()
    #  数据库配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 连接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res1 = session.execute(
            text("select tid, name from teacher;")
        )
        # 每次迭代 取 1行 数据
        for i in session_res1.partitions():
            print(i)
            """
            [(1, '语文老师')]
            [(2, '英语老师')]
            [(3, '数学老师')]
            """
    
        session_res2 = session.execute(
            text("select tid, name from teacher;")
        )
        # 每次迭代 取 2行 数据
        for i in session_res2.partitions(2):
            print(i)
            """
            [(1, '语文老师'), (2, '英语老师')]
            [(3, '数学老师')]
            """
        # 已经迭代完了, 就没有值了
        print(list(session_res2.partitions()))  # []
    
    
  • yield_per(num)
    迭代num行数据, 返回的是Result对象

    from sqlalchemy import create_engine, text
    from sqlalchemy.orm import declarative_base, sessionmaker
    
    # 导入公共基类
    Base = declarative_base()
    #  数据库配置
    DATABASE_CONFIG = {
        "username": "root",
        "password": "123456",
        "host": "localhost",
        "database": "test"
    }
    # 连接mysql
    engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                           echo=True, future=True)
    Session = sessionmaker(bind=engine)
    
    with Session() as session:
        session_res = session.execute(
            text("select tid, name from teacher;")
        )
        # [(1, '语文老师'), (2, '英语老师'), (3, '数学老师')]
        print(session_res.yield_per(1).all())
    
    
  • close
    关闭此Result, 再操作的话会抛出异常: sqlalchemy.exc.ResourceClosedError: This result object is closed.

Row

一般来说, 一行数据是一个Row对象
常用的属性或方法:

一般使用:
注意: 我们每一次迭代Result对象, 得到的是Row对象

通过属性取值

result = conn.execute(text("select x, y from some_table"))

for row in result:
    y = row.y

    # illustrate use with Python f-strings
    print(f"Row: {row.x} {row.y}")

通过元组解包

result = conn.execute(text("select x, y from some_table"))

for x, y in result:
    # ...

通过索引取值

result = conn.execute(text("select x, y from some_table"))

for row in result:
    x = row[0]

MetaData

MetaData是包含TableEngine的对象, 也就是说它主要是用来管理Table (表)的
下面列出MetaData对象的一些方法

内置函数

常用的SQL函数:

使用例子:

from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base, sessionmaker
from sqlalchemy import Column, Integer, String
from sqlalchemy import select, func

# 导入公共基类
Base = declarative_base()
#  数据库配置
DATABASE_CONFIG = {
    "username": "root",
    "password": "123456",
    "host": "localhost",
    "database": "test"
}
# 连接mysql
engine = create_engine("mysql+pymysql://{username}:{password}@{host}/{database}".format(**DATABASE_CONFIG),
                       echo=True, future=True)
Session = sessionmaker(bind=engine)


class Teacher(Base):
    __tablename__ = "teacher"

    tid = Column("tid", Integer, primary_key=True, autoincrement=True)

    name = Column("name", String(10), nullable=False, comment="教师名")


with Session() as session:
    session_res = session.execute(
        select(func.count()).select_from(Teacher)
    )
    # (3,)
    print(session_res.one())

Column定义

一个Column即表的一列数据, 和我们用SQL语句定义一列数据一样, 参数主要包括: 字段名、字段类型、约束条件, 比如:

from sqlalchemy import Column, String

Column("name", String(30), unique=True, nullable=False, comment='姓名')

字段类型, 一般你可以用直接指定数据库的字段类型, 也可以让SQLAlchemy的DDL自动选择字段类型
直接使用数据库的字段类型

自动转化的字段类型

约束条件

Column的例子:

from sqlalchemy import DateTime, func


class Data(Base):
    __tablename__ = 'data'
    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    created_at = Column(DateTime, server_default=func.now(), comment='创建时间')
    updated_at = Column(DateTime, server_default=func.now(), onupdate=func.now(), comment='更新时间')

01-10 04:41