Python与PostgreSQL的深度整合:CRUD操作全指南
1. 环境准备
1.1 安装必要的包
pip install sqlalchemy psycopg2-binary sqlmodel
1.2 数据库连接
from sqlalchemy import create_engine
from sqlmodel import Session, SQLModel
# 连接字符串格式
DATABASE_URL = "postgresql://username:password@localhost:5432/dbname"
# 创建引擎
engine = create_engine(DATABASE_URL)
# 创建所有表
SQLModel.metadata.create_all(engine)
2. 模型定义
2.1 基本模型
from datetime import datetime
from typing import Optional
from sqlmodel import SQLModel, Field
class User(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
username: str = Field(index=True)
email: str = Field(unique=True)
created_at: datetime = Field(default_factory=datetime.utcnow)
is_active: bool = Field(default=True)
2.2 关系模型
class Post(SQLModel, table=True):
id: Optional[int] = Field(default=None, primary_key=True)
title: str
content: str
user_id: int = Field(foreign_key="user.id")
created_at: datetime = Field(default_factory=datetime.utcnow)
3. CRUD 操作
3.1 创建操作 (Create)
def create_user(username: str, email: str) -> User:
with Session(engine) as session:
# 创建用户对象
user = User(username=username, email=email)
try:
# 添加到会话
session.add(user)
# 提交事务
session.commit()
# 刷新对象(获取数据库生成的值)
session.refresh(user)
return user
except Exception as e:
session.rollback()
raise e
# 使用示例
new_user = create_user("john_doe", "john@example.com")
3.2 查询操作 (Read)
from sqlmodel import select
def get_user_by_id(user_id: int) -> Optional[User]:
with Session(engine) as session:
return session.get(User, user_id)
def get_user_by_email(email: str) -> Optional[User]:
with Session(engine) as session:
statement = select(User).where(User.email == email)
return session.exec(statement).first()
def get_all_users(skip: int = 0, limit: int = 100) -> list[User]:
with Session(engine) as session:
statement = select(User).offset(skip).limit(limit)
return session.exec(statement).all()
# 复杂查询示例
def get_active_users_with_posts():
with Session(engine) as session:
statement = (
select(User, Post)
.join(Post)
.where(User.is_active == True)
.order_by(User.created_at.desc())
)
return session.exec(statement).all()
3.3 更新操作 (Update)
def update_user(user_id: int, **kwargs) -> Optional[User]:
with Session(engine) as session:
user = session.get(User, user_id)
if not user:
return None
# 更新提供的字段
for key, value in kwargs.items():
if hasattr(user, key):
setattr(user, key, value)
try:
session.add(user)
session.commit()
session.refresh(user)
return user
except Exception as e:
session.rollback()
raise e
# 使用示例
updated_user = update_user(1, email="newemail@example.com", is_active=False)
3.4 删除操作 (Delete)
def delete_user(user_id: int) -> bool:
with Session(engine) as session:
user = session.get(User, user_id)
if not user:
return False
try:
session.delete(user)
session.commit()
return True
except Exception as e:
session.rollback()
raise e
4. 高级用法
4.1 事务管理
from sqlalchemy.orm import Session
def transfer_posts(from_user_id: int, to_user_id: int) -> bool:
with Session(engine) as session:
try:
# 开始事务
with session.begin():
# 更新所有帖子的用户ID
statement = (
update(Post)
.where(Post.user_id == from_user_id)
.values(user_id=to_user_id)
)
session.exec(statement)
# 如果需要,可以在这里执行更多操作
return True
except Exception as e:
print(f"Error: {e}")
return False
4.2 批量操作
def bulk_create_users(users_data: list[dict]) -> list[User]:
with Session(engine) as session:
try:
users = [User(**data) for data in users_data]
session.add_all(users)
session.commit()
for user in users:
session.refresh(user)
return users
except Exception as e:
session.rollback()
raise e
4.3 异步操作
from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
# 异步数据库URL
ASYNC_DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"
async_engine = create_async_engine(ASYNC_DATABASE_URL)
async def async_get_user(user_id: int) -> Optional[User]:
async with AsyncSession(async_engine) as session:
statement = select(User).where(User.id == user_id)
result = await session.exec(statement)
return result.first()
5. 最佳实践
5.1 连接池配置
engine = create_engine(
DATABASE_URL,
pool_size=5, # 连接池大小
max_overflow=10, # 超出pool_size后的最大连接数
pool_timeout=30, # 连接池获取连接的超时时间
pool_recycle=1800, # 连接重置时间(秒)
)
5.2 异常处理
from sqlalchemy.exc import IntegrityError, OperationalError
def safe_create_user(username: str, email: str) -> tuple[Optional[User], str]:
try:
user = create_user(username, email)
return user, "Success"
except IntegrityError:
return None, "User with this email already exists"
except OperationalError:
return None, "Database connection error"
except Exception as e:
return None, f"Unexpected error: {str(e)}"
5.3 模型验证
from pydantic import EmailStr, validator
class UserCreate(SQLModel):
username: str
email: EmailStr
@validator('username')
def username_must_be_valid(cls, v):
if len(v) < 3:
raise ValueError('Username must be at least 3 characters long')
return v
6. 性能优化
6.1 查询优化
# 使用 select_from 优化多表查询
def get_user_posts_optimized(user_id: int):
with Session(engine) as session:
statement = (
select(User, Post)
.select_from(User)
.join(Post)
.where(User.id == user_id)
.options(selectinload(User.posts))
)
return session.exec(statement).all()
6.2 缓存实现
from functools import lru_cache
@lru_cache(maxsize=100)
def get_cached_user(user_id: int) -> Optional[User]:
return get_user_by_id(user_id)
7. 调试和监控
7.1 SQL语句日志
import logging
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
7.2 性能分析
from sqlalchemy import event
@event.listens_for(engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
conn.info.setdefault('query_start_time', []).append(time.time())
@event.listens_for(engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
total = time.time() - conn.info['query_start_time'].pop()
print(f"Query took {total:.2f} seconds")
8. 部署注意事项
- 使用环境变量管理数据库连接信息
- 实现连接重试机制
- 正确配置连接池
- 实现数据库迁移策略
- 定期备份数据
- 监控数据库性能
- 实现错误报告机制
9. 常见问题解决
- 连接池耗尽
- 死锁处理
- 长事务管理
- 并发访问控制
- 大数据集处理
- 内存使用优化
10. 参考资源