Python与PostgreSQL的深度整合:CRUD操作全指南

1. 环境准备

1.1 安装必要的包

pip install sqlalchemy psycopg2-binary sqlmodel

1.2 数据库连接

from sqlalchemy import create_engine
from sqlmodel import Session, SQLModel

# 连接字符串格式
DATABASE_URL = "postgresql://username:password@localhost:5432/dbname"

# 创建引擎
engine = create_engine(DATABASE_URL)

# 创建所有表
SQLModel.metadata.create_all(engine)

2. 模型定义

2.1 基本模型

from datetime import datetime
from typing import Optional
from sqlmodel import SQLModel, Field

class User(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    username: str = Field(index=True)
    email: str = Field(unique=True)
    created_at: datetime = Field(default_factory=datetime.utcnow)
    is_active: bool = Field(default=True)

2.2 关系模型

class Post(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    title: str
    content: str
    user_id: int = Field(foreign_key="user.id")
    created_at: datetime = Field(default_factory=datetime.utcnow)

3. CRUD 操作

3.1 创建操作 (Create)

def create_user(username: str, email: str) -> User:
    with Session(engine) as session:
        # 创建用户对象
        user = User(username=username, email=email)
        
        try:
            # 添加到会话
            session.add(user)
            # 提交事务
            session.commit()
            # 刷新对象(获取数据库生成的值)
            session.refresh(user)
            return user
        except Exception as e:
            session.rollback()
            raise e

# 使用示例
new_user = create_user("john_doe", "john@example.com")

3.2 查询操作 (Read)

from sqlmodel import select

def get_user_by_id(user_id: int) -> Optional[User]:
    with Session(engine) as session:
        return session.get(User, user_id)

def get_user_by_email(email: str) -> Optional[User]:
    with Session(engine) as session:
        statement = select(User).where(User.email == email)
        return session.exec(statement).first()

def get_all_users(skip: int = 0, limit: int = 100) -> list[User]:
    with Session(engine) as session:
        statement = select(User).offset(skip).limit(limit)
        return session.exec(statement).all()

# 复杂查询示例
def get_active_users_with_posts():
    with Session(engine) as session:
        statement = (
            select(User, Post)
            .join(Post)
            .where(User.is_active == True)
            .order_by(User.created_at.desc())
        )
        return session.exec(statement).all()

3.3 更新操作 (Update)

def update_user(user_id: int, **kwargs) -> Optional[User]:
    with Session(engine) as session:
        user = session.get(User, user_id)
        if not user:
            return None
        
        # 更新提供的字段
        for key, value in kwargs.items():
            if hasattr(user, key):
                setattr(user, key, value)
        
        try:
            session.add(user)
            session.commit()
            session.refresh(user)
            return user
        except Exception as e:
            session.rollback()
            raise e

# 使用示例
updated_user = update_user(1, email="newemail@example.com", is_active=False)

3.4 删除操作 (Delete)

def delete_user(user_id: int) -> bool:
    with Session(engine) as session:
        user = session.get(User, user_id)
        if not user:
            return False
        
        try:
            session.delete(user)
            session.commit()
            return True
        except Exception as e:
            session.rollback()
            raise e

4. 高级用法

4.1 事务管理

from sqlalchemy.orm import Session

def transfer_posts(from_user_id: int, to_user_id: int) -> bool:
    with Session(engine) as session:
        try:
            # 开始事务
            with session.begin():
                # 更新所有帖子的用户ID
                statement = (
                    update(Post)
                    .where(Post.user_id == from_user_id)
                    .values(user_id=to_user_id)
                )
                session.exec(statement)
                
                # 如果需要,可以在这里执行更多操作
                
            return True
        except Exception as e:
            print(f"Error: {e}")
            return False

4.2 批量操作

def bulk_create_users(users_data: list[dict]) -> list[User]:
    with Session(engine) as session:
        try:
            users = [User(**data) for data in users_data]
            session.add_all(users)
            session.commit()
            
            for user in users:
                session.refresh(user)
            
            return users
        except Exception as e:
            session.rollback()
            raise e

4.3 异步操作

from sqlmodel.ext.asyncio.session import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine

# 异步数据库URL
ASYNC_DATABASE_URL = "postgresql+asyncpg://user:password@localhost/dbname"

async_engine = create_async_engine(ASYNC_DATABASE_URL)

async def async_get_user(user_id: int) -> Optional[User]:
    async with AsyncSession(async_engine) as session:
        statement = select(User).where(User.id == user_id)
        result = await session.exec(statement)
        return result.first()

5. 最佳实践

5.1 连接池配置

engine = create_engine(
    DATABASE_URL,
    pool_size=5,  # 连接池大小
    max_overflow=10,  # 超出pool_size后的最大连接数
    pool_timeout=30,  # 连接池获取连接的超时时间
    pool_recycle=1800,  # 连接重置时间(秒)
)

5.2 异常处理

from sqlalchemy.exc import IntegrityError, OperationalError

def safe_create_user(username: str, email: str) -> tuple[Optional[User], str]:
    try:
        user = create_user(username, email)
        return user, "Success"
    except IntegrityError:
        return None, "User with this email already exists"
    except OperationalError:
        return None, "Database connection error"
    except Exception as e:
        return None, f"Unexpected error: {str(e)}"

5.3 模型验证

from pydantic import EmailStr, validator

class UserCreate(SQLModel):
    username: str
    email: EmailStr
    
    @validator('username')
    def username_must_be_valid(cls, v):
        if len(v) < 3:
            raise ValueError('Username must be at least 3 characters long')
        return v

6. 性能优化

6.1 查询优化

# 使用 select_from 优化多表查询
def get_user_posts_optimized(user_id: int):
    with Session(engine) as session:
        statement = (
            select(User, Post)
            .select_from(User)
            .join(Post)
            .where(User.id == user_id)
            .options(selectinload(User.posts))
        )
        return session.exec(statement).all()

6.2 缓存实现

from functools import lru_cache

@lru_cache(maxsize=100)
def get_cached_user(user_id: int) -> Optional[User]:
    return get_user_by_id(user_id)

7. 调试和监控

7.1 SQL语句日志

import logging

logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)

7.2 性能分析

from sqlalchemy import event

@event.listens_for(engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    conn.info.setdefault('query_start_time', []).append(time.time())

@event.listens_for(engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    total = time.time() - conn.info['query_start_time'].pop()
    print(f"Query took {total:.2f} seconds")

8. 部署注意事项

  1. 使用环境变量管理数据库连接信息
  2. 实现连接重试机制
  3. 正确配置连接池
  4. 实现数据库迁移策略
  5. 定期备份数据
  6. 监控数据库性能
  7. 实现错误报告机制

9. 常见问题解决

  1. 连接池耗尽
  2. 死锁处理
  3. 长事务管理
  4. 并发访问控制
  5. 大数据集处理
  6. 内存使用优化

10. 参考资源

12-26 17:26