SQLALchemy
SQLALchemy
是Python
中的一款优秀的ORM
框架,它可以作用于任何第三方Web
框架,如flask
,tornado
等框架。
SQLALchemy
相较于DjangoORM
来说更加的贴近原生SQL
语句,因此学习难度较低。
下载SQLALchemy
模块:
pip install sqlalchemy
值得注意的是SQLALchemy
必须依赖其他操纵数据的模块,Dialect
用于和数据API
进行交流,根据配置文件的不同调用不同的数据库API
,从而实现对数据库的操作,如:
MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
表操作
SQLALchemy
中不允许修改表结构,如果修改表结构则需要删除旧表,再创建新表:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import datetime
from sqlalchemy import Column, Integer, String, DateTime, UniqueConstraint, Index
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
# 基础类
Base = declarative_base()
# 创建引擎
engine = create_engine(
"mysql+pymysql://[email protected]:3306/db1?charset=utf8",
# "mysql+pymysql://root:[email protected]:3306/db1?charset=utf8", # 有密码时
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
class Users(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False)
age = Column(Integer,nullable=False)
phone = Column(String(11))
addr = Column(String(64), nullable=True)
create_time = Column(DateTime, default=datetime.datetime.now) # 一定不要加括号
__table_args__ = (
UniqueConstraint("id", "name"), # 创建联合唯一 可指定name给个别名
Index("phone", "addr", unique=True), # 创建联合唯一索引 可指定name给个别名
)
def __str__(self):
return "object:<id:%s name:%s>" % (self.id, self.name)
def create_tb():
"""
创建表
:return:
"""
Base.metadata.create_all(engine)
def drop_tb():
"""
删除表
:return:
"""
Base.metadata.drop_all(engine)
if __name__ == '__main__':
drop_tb()
create_tb()
链接库
表创建好之后,开始链接库。
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
单表记录
新增记录
新增单条记录:
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
user_obj = Users(name="user001", phone="15125352333",age=23, addr="China")
session.add(user_obj)
# 提交
session.commit()
# 关闭链接(可使用session.remove())
session.close()
修改记录
修改记录:
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
# 修改名字
session.query(Users).filter_by(id=1).update({"name": "USER001"})
# 修改年龄,使用+号,默认为"fetch",代表只允许int类型使用+号
session.query(Users).filter_by(id=1).update({"age": Users.age + 1},synchronize_session="fetch")
# 修改地址,使用+号,由于是字符类型,所以要修改synchronize_session=False
session.query(Users).filter_by(id=1).update({"addr":Users.addr + "BeiJing"},synchronize_session=False)
# 提交
session.commit()
# 关闭链接
session.close()
删除记录
删除案例:
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
session.query(Users).filter_by(id=2).delete()
# 提交
session.commit()
# 关闭链接
session.close()
批量增加
批量增加:
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
# 批量增加
session.add_all([
Users(name="user002",age=21,phone="13269867233",addr="ShangHai"),
Users(name="user003",age=18,phone="13269867234",addr="GuangZhou"),
Users(name="user003",age=24,phone="13269867235",addr="ChongQing"),
])
# 提交
session.commit()
# 关闭链接
session.close()
单表查询
基本查询
基本查询:
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
# 查询
# -- 查所有 --
result_01 = session.query(Users).all()
# -- 过滤 --
result_02 = session.query(Users).filter(Users.name == "USER001").all() # Python表达式的形式过滤
result_03 = session.query(Users).filter_by(name="user002").all() # ORM形式过滤
result_04 = session.query(Users).filter_by(name="user003").first() # ORM形式过滤 取第一个
print(result_01) # [<models.Users>,<models.Users>,<models.Users>]
print(result_02)
print(result_03)
print(result_04) # object:<id:3 name:user003> 通过__str__拿到结果
# 提交
session.commit()
# 关闭链接
session.close()
其他过滤
条件查询:
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
# 只拿某字段
result_00 = session.query(Users.name,Users.age).first()
print(result_00)
# and(用逗号或者用and_)
result_01 = session.query(Users).filter( Users.id > 1,Users.age < 23).all()
print(result_01)
from sqlalchemy import and_
result_02 = session.query(Users).filter(and_( Users.id > 1,Users.age < 23)).all()
print(result_02)
# or
from sqlalchemy import or_
result_03 = session.query(Users).filter(or_(Users.id > 3,Users.age < 23)).all()
print(result_03)
# and与or的组合使用
result_04 = session.query(Users).filter(or_(
Users.id > 1,
and_(Users.id > 2, Users.age < 24)
)).all()
print(result_04)
# 范围
result_05 = session.query(Users).filter(Users.age.between(18,24)).all()
print(result_05)
# 包含
result_06 = session.query(Users).filter(Users.age.in_([18,21,24])).all()
print(result_06)
# 取反 ~
result_07 = session.query(Users).filter(~Users.age.in_([18,21,24])).all()
print(result_07)
# 通配符
result_08 = session.query(Users).filter(Users.name.like("us%")).all()
print(result_08)
# 分页
result_09 = session.query(Users).all()[0:1]
print(result_09)
# 排序
result_10 = session.query(Users).order_by(Users.id.desc()).all() # 倒序
print(result_10)
result_11 = session.query(Users).order_by(Users.id.asc()).all() # 正序
print(result_11)
# 分组
result_12 = session.query(Users).group_by(Users.id).all()
print(result_12)
# 聚合函数
from sqlalchemy.sql import func
result_13 = session.query(
func.max(Users.age),
func.sum(Users.age),
func.min(Users.age),
).group_by(Users.name).having(func.max(Users.age > 12)).all()
print(result_13)
# 提交
session.commit()
# 关闭链接
session.close()
多表相关
一对多
首先是建立一对多的关系,使用relationship
做逻辑一对多,不会在物理表中创建关系,但是可以通过该字段进行增删改查:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
# 基础类
Base = declarative_base()
# 创建引擎
engine = create_engine(
"mysql+pymysql://[email protected]:3306/db1?charset=utf8",
# "mysql+pymysql://root:[email protected]:3306/db1?charset=utf8", # 有密码时
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
class Classes(Base):
__tablename__ = "classes"
id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
class Students(Base):
__tablename__ = "students"
id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
# 真实约束字段:避免脏数据写入,在物理表中会创建真实字段关系
# 可选级联操作:CASCADE,DELETE、RESTRICT
fk_class = Column(Integer, ForeignKey("classes.id",ondelete="CASCADE",onupdate="CASCADE"))
# 逻辑关系字段:不会在真实物理表中创建字段,但是可以通过该逻辑字段进行增删改查
# backref:反向查询的名字
re_class = relationship("Classes",backref="students")
def create_tb():
"""
创建表
:return:
"""
Base.metadata.create_all(engine)
def drop_tb():
"""
删除表
:return:
"""
Base.metadata.drop_all(engine)
if __name__ == '__main__':
drop_tb()
create_tb()
通过逻辑字段进行增加:
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
session.add_all(
[
Students(name="学生01", re_class=Classes(name="一年级一班")), # 自动填入fk_class
Students(name="学生02", re_class=Classes(name="一年级二班")),
]
)
# 提交
session.commit()
# 关闭链接
session.close()
多对多
多对多也使用relationship
做逻辑多对多,不会在物理表中创建关系,但是可以通过该字段进行增删改查。
使用relationship
时,传入指定手动生成的第三张表,代表这是多对多关系:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
# 基础类
Base = declarative_base()
# 创建引擎
engine = create_engine(
"mysql+pymysql://[email protected]:3306/db1?charset=utf8",
# "mysql+pymysql://root:[email protected]:3306/db1?charset=utf8", # 有密码时
max_overflow=0, # 超过连接池大小外最多创建的连接
pool_size=5, # 连接池大小
pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
)
class Classes(Base):
__tablename__ = "classes"
id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
class Students(Base):
__tablename__ = "students"
id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
# 可选级联操作:CASCADE,DELETE、RESTRICT
fk_class = Column(Integer, ForeignKey("classes.id", ondelete="CASCADE", onupdate="CASCADE"))
# 逻辑关系字段:不会在真实物理表中创建字段,但是可以通过该逻辑字段进行增删改查
# backref:反向查询的名字
re_class = relationship("Classes", backref="students")
class Teachers(Base):
__tablename__ = "teachers"
id = Column(Integer, primary_key=True)
name = Column(String(32), nullable=False)
# 逻辑字段M2M:指定第三张表,secondary参数为__tablename__,反向查询为teachers
re_class = relationship("Classes", secondary="teachersm2mclasses", backref="teachers")
class TeachersM2mClasses(Base):
__tablename__ = "teachersm2mclasses"
id = Column(Integer, primary_key=True)
teacher_id = Column(Integer, ForeignKey("teachers.id"))
class_id = Column(Integer, ForeignKey("classes.id"))
__table_args__ = (
UniqueConstraint("teacher_id", "class_id"), # 创建联合唯一 可指定name给个别名
)
def create_tb():
"""
创建表
:return:
"""
Base.metadata.create_all(engine)
def drop_tb():
"""
删除表
:return:
"""
Base.metadata.drop_all(engine)
if __name__ == '__main__':
drop_tb()
create_tb()
用一个列表,将班级的记录对象放进去,你可以用多种增加方式,使用逻辑字段添加或自己操纵第三张表:
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
session.add_all(
[
Teachers(name="老师01",re_class=[
session.query(Classes).filter_by(id=1).first()
]),
Teachers(name="老师02",re_class=[
session.query(Classes).filter_by(id=1).first()
]),
Teachers(name="老师03",re_class=[
session.query(Classes).filter_by(id=2).first()
]),
]
)
# 提交
session.commit()
# 关闭链接
session.close()
组合查询
组合查询将两张表用笛卡尔积的效果显现出来:
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
# 必须用filter,获取全部也是,不可以使用all因为他会返回一个list,list不具备union_all
# 使用filter返回的对象是:<class 'sqlalchemy.orm.query.Query'>
# 并且query中必须单拿某一个字段,如果不指定字段就直接返回对象
s = session.query(Students.name).filter()
t = session.query(Teachers.name).filter()
c = session.query(Classes.name).filter()
ret = s.union_all(t).union_all(c).all() # 用列表显示
print(ret)
# [('学生01',), ('学生02',), ('老师01',), ('老师02',), ('老师03',), ('一年级一班',), ('一年级二班',)]
# 提交
session.commit()
# 关闭链接
session.close()
连表查询
使用join
进行连表查询:
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
# 手动指定条件查询
result = session.query(Students.name, Classes.name).filter(Students.id == Classes.id).all()
for i in result:
print(i)
# 连接查询,同上,内部自动指定 Students.fk_class == Classes.id 的条件
result = session.query(Students.name, Classes.name).join(Classes).all()
# 相当于:result = session.query(Students.name,Classes.name).join(Classes, Students.fk_class == Classes.id).all()
for i in result:
print(i)
# 左链接查询,即使有同学没有班级也拿出来
result = session.query(Students.name, Classes.name).join(Classes, isouter=True).all()
for i in result:
print(i)
# 如果想查看有哪些班级没有同学,就换一个位置
result = session.query(Students.name, Classes.name).join(Students, isouter=True).all()
for i in result:
print(i)
# 三表查询,需要自己指定条件
result = session.query(Teachers.name, Classes.name, TeachersM2mClasses.id) \
.join(Teachers, TeachersM2mClasses.teacher_id == Teachers.id) \
.join(Classes, TeachersM2mClasses.class_id == Classes.id) \
.filter() # 查看原生语句
print(result)
for i in result:
print(i)
# 提交
session.commit()
# 关闭链接
session.close()
正反查询
上面是使用join
进行的连表查询,其实也可以使用逻辑字段relationship
查询。
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
# 正向:查看第一个老师都在哪些班级(通过逻辑字段的名字)
result = session.query(Teachers).first()
# result.re_class是一个列表,存了有关该老师所在的班级 <class 'sqlalchemy.orm.collections.InstrumentedList'>
for class_obj in result.re_class: # 查看其所有的班级
print(class_obj.name)
# 反向:查看第一个班级下都有哪些老师,都有哪些学生(通过逻辑字段中的backref参数进行反向查询)
result = session.query(Classes).first()
# 看老师
for teacher_obj in result.teachers:
print(teacher_obj.name)
# 看学生
for student_obj in result.students:
print(student_obj.name)
# 提交
session.commit()
# 关闭链接
session.close()
正反方法
使用逻辑字段relationship
可拥有一些方法执行增删改。
由于逻辑字段是一个类似列表的存在,所以列表的方法都能用。
print(type(老师对象.班级列表))
# <class 'sqlalchemy.orm.collections.InstrumentedList'>
比如使用extend
方法增加老师的班级:
# 给老师增加班级
result = session.query(Teachers).first()
# extend方法:
result.re_class.extend([
Classes(name="三年级一班",),
Classes(name="三年级二班",),
])
使用remove
方法删除老师的班级:
# 减少老师所在的班级
result = session.query(Teachers).first()
# 待删除的班级对象,集合查找比较快
delete_class_set = {
session.query(Classes).filter_by(id=7).first(),
session.query(Classes).filter_by(id=8).first(),
}
# 循换老师所在的班级
# remove方法:
for class_obj in result.re_class:
if class_obj in delete_class_set:
result.re_class.remove(class_obj)
使用clear
清空老师所对应的班级:
# 拿出一个老师
result = session.query(Teachers).first()
result.re_class.clear()
原生SQL
查看SQL命令
如果一条查询语句是以filter
结尾,则返回结果对象的__str__
方法中都是SQL
语句:
result = session.query(Teachers).filter()
print(result)
# SELECT teachers.id AS teachers_id, teachers.name AS teachers_name
# FROM teachers
如果是all
结尾,返回的就是一个列表,first
结尾也是一个列表:
result = session.query(Teachers).all()
print(result)
# [<models.Teachers object at 0x00000178EB0B5550>, <models.Teachers object at 0x00000178EB0B5518>, <models.Teachers object at 0x00000178EB0B5048>]
执行SQL语句
执行原生SQL
:
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
# 导入引擎,模型表等
from models import *
# 通过Session绑定引擎和数据库建立关系
Session = sessionmaker(bind=engine)
# 创建链接池,使用session即可为当前线程拿出一个链接对象。内部采用threading.local进行隔离
session = scoped_session(Session)
cursor = session.execute(r"select * from students where id <= (:num)",params={"num":2})
print(cursor.fetchall())
# 提交
session.commit()
# 关闭链接
session.close()