Python MySQL 进阶用法详解

1. 使用连接池

使用 DBUtils 实现连接池管理:

from dbutils.pooled_db import PooledDB
import pymysql

class DBConnectionPool:
    _pool = None
    
    @staticmethod
    def get_pool():
        if DBConnectionPool._pool is None:
            DBConnectionPool._pool = PooledDB(
                creator=pymysql,        # 使用pymysql作为连接器
                maxconnections=10,      # 连接池最大连接数
                mincached=2,           # 初始化时创建的空闲连接数
                maxcached=5,           # 连接池最大空闲连接数
                maxshared=3,           # 共享连接数
                blocking=True,         # 连接数达到最大时是否阻塞
                maxusage=None,         # 一个连接最多被使用的次数
                setsession=[],         # 开始会话前执行的命令
                ping=0,                # ping MySQL服务端确保连接有效
                host='localhost',
                port=3306,
                user='root',
                password='123456',
                database='test',
                charset='utf8mb4',
                cursorclass=pymysql.cursors.DictCursor
            )
        return DBConnectionPool._pool

    @staticmethod
    def get_conn():
        return DBConnectionPool.get_pool().connection()

# 使用示例
class UserDAO:
    def get_user(self, user_id):
        with DBConnectionPool.get_conn() as conn:
            with conn.cursor() as cursor:
                sql = "SELECT * FROM users WHERE id = %s"
                cursor.execute(sql, (user_id,))
                return cursor.fetchone()

2. 实现简单的 ORM 映射

from typing import Dict, Any, Type, TypeVar
from datetime import datetime

T = TypeVar('T', bound='BaseModel')

class Field:
    def __init__(self, field_type: str, primary_key: bool = False):
        self.field_type = field_type
        self.primary_key = primary_key

class BaseModel:
    _table_name: str = ''
    _fields: Dict[str, Field] = {}
    
    def __init__(self, **kwargs):
        for key, value in kwargs.items():
            setattr(self, key, value)
    
    @classmethod
    def from_db_dict(cls: Type[T], db_dict: Dict[str, Any]) -> T:
        """从数据库字典创建对象"""
        return cls(**db_dict)
    
    def to_db_dict(self) -> Dict[str, Any]:
        """转换为数据库字典"""
        result = {}
        for key in self._fields:
            if hasattr(self, key):
                result[key] = getattr(self, key)
        return result

class User(BaseModel):
    _table_name = 'users'
    _fields = {
        'id': Field('BIGINT', primary_key=True),
        'username': Field('VARCHAR(50)'),
        'password': Field('VARCHAR(100)'),
        'age': Field('INT'),
        'create_at': Field('TIMESTAMP')
    }
    
    def __init__(self, username: str, password: str, age: int, 
                 id: int = None, create_at: datetime = None):
        self.id = id
        self.username = username
        self.password = password
        self.age = age
        self.create_at = create_at

class UserRepository:
    def __init__(self, db_pool):
        self.db_pool = db_pool
    
    def save(self, user: User) -> User:
        with self.db_pool.get_conn() as conn:
            with conn.cursor() as cursor:
                if user.id is None:
                    # Insert
                    sql = """INSERT INTO users (username, password, age) 
                            VALUES (%s, %s, %s)"""
                    cursor.execute(sql, (user.username, user.password, user.age))
                    user.id = cursor.lastrowid
                else:
                    # Update
                    sql = """UPDATE users SET username=%s, password=%s, age=%s 
                            WHERE id=%s"""
                    cursor.execute(sql, (user.username, user.password, 
                                      user.age, user.id))
                conn.commit()
                return user

# 使用示例
user = User(username="张三", password="123456", age=25)
repo = UserRepository(DBConnectionPool())
saved_user = repo.save(user)

3. 读写分离实现

from enum import Enum
from typing import List
import random

class DBType(Enum):
    MASTER = "master"
    SLAVE = "slave"

class DBConfig:
    def __init__(self, host: str, port: int, db_type: DBType):
        self.host = host
        self.port = port
        self.db_type = db_type

class DBRouter:
    def __init__(self):
        self.master_config = DBConfig("master.mysql", 3306, DBType.MASTER)
        self.slave_configs: List[DBConfig] = [
            DBConfig("slave1.mysql", 3306, DBType.SLAVE),
            DBConfig("slave2.mysql", 3306, DBType.SLAVE),
        ]
        
        # 创建连接池
        self.master_pool = self._create_pool(self.master_config)
        self.slave_pools = [self._create_pool(cfg) for cfg in self.slave_configs]
    
    def _create_pool(self, config: DBConfig):
        return PooledDB(
            creator=pymysql,
            maxconnections=10,
            host=config.host,
            port=config.port,
            user='root',
            password='123456',
            database='test',
            charset='utf8mb4'
        )
    
    def get_connection(self, for_write: bool = False):
        if for_write:
            return self.master_pool.connection()
        # 随机选择一个从库
        slave_pool = random.choice(self.slave_pools)
        return slave_pool.connection()

class UserService:
    def __init__(self, db_router: DBRouter):
        self.db_router = db_router
    
    def get_user(self, user_id: int):
        # 读操作从从库获取
        with self.db_router.get_connection(for_write=False) as conn:
            with conn.cursor() as cursor:
                sql = "SELECT * FROM users WHERE id = %s"
                cursor.execute(sql, (user_id,))
                return cursor.fetchone()
    
    def create_user(self, user: User):
        # 写操作使用主库
        with self.db_router.get_connection(for_write=True) as conn:
            with conn.cursor() as cursor:
                sql = """INSERT INTO users (username, password, age) 
                        VALUES (%s, %s, %s)"""
                cursor.execute(sql, (user.username, user.password, user.age))
                conn.commit()
                return cursor.lastrowid

4. 分库分表实现

from hashlib import md5
from typing import Tuple

class ShardingConfig:
    DB_COUNT = 2      # 数据库数量
    TABLE_COUNT = 4   # 每个库中的表数量

class ShardingRouter:
    @staticmethod
    def get_db_table(user_id: int) -> Tuple[int, int]:
        """获取分库分表位置"""
        # 使用用户ID做hash
        hash_val = int(md5(str(user_id).encode()).hexdigest(), 16)
        db_index = hash_val % ShardingConfig.DB_COUNT
        table_index = (hash_val // ShardingConfig.DB_COUNT) % ShardingConfig.TABLE_COUNT
        return db_index, table_index

    def get_connection(self, db_index: int):
        """获取指定分库的连接"""
        # 这里简化处理,实际应该维护多个连接池
        config = {
            'host': f'mysql{db_index}.example.com',
            'port': 3306,
            'user': 'root',
            'password': '123456',
            'database': f'test_{db_index}'
        }
        return pymysql.connect(**config)

class ShardingUserRepository:
    def __init__(self):
        self.router = ShardingRouter()
    
    def get_user(self, user_id: int) -> Optional[Dict]:
        db_index, table_index = self.router.get_db_table(user_id)
        with self.router.get_connection(db_index) as conn:
            with conn.cursor() as cursor:
                sql = f"SELECT * FROM users_{table_index} WHERE id = %s"
                cursor.execute(sql, (user_id,))
                return cursor.fetchone()
    
    def create_user(self, user: User) -> int:
        # 这里使用用户名作为分片键
        hash_val = int(md5(user.username.encode()).hexdigest(), 16)
        db_index = hash_val % ShardingConfig.DB_COUNT
        table_index = (hash_val // ShardingConfig.DB_COUNT) % ShardingConfig.TABLE_COUNT
        
        with self.router.get_connection(db_index) as conn:
            with conn.cursor() as cursor:
                sql = f"""INSERT INTO users_{table_index} 
                         (username, password, age) VALUES (%s, %s, %s)"""
                cursor.execute(sql, (user.username, user.password, user.age))
                conn.commit()
                return cursor.lastrowid

5. 主从复制配置

5.1 主库配置 (my.cnf)

[mysqld]
server-id = 1
log-bin = mysql-bin
binlog_format = ROW
sync_binlog = 1

5.2 从库配置 (my.cnf)

[mysqld]
server-id = 2
relay-log = slave-relay-bin
read_only = 1

5.3 主从复制设置

在主库执行:

-- 创建复制用户
CREATE USER 'repl'@'%' IDENTIFIED BY 'password';
GRANT REPLICATION SLAVE ON *.* TO 'repl'@'%';
FLUSH PRIVILEGES;

-- 获取主库状态
SHOW MASTER STATUS;

在从库执行:

CHANGE MASTER TO
    MASTER_HOST='master_host',
    MASTER_USER='repl',
    MASTER_PASSWORD='password',
    MASTER_LOG_FILE='mysql-bin.000001',
    MASTER_LOG_POS=123;

-- 启动从库复制
START SLAVE;

-- 查看从库状态
SHOW SLAVE STATUS\G

5.4 Python 监控主从状态

class ReplicationMonitor:
    def __init__(self, master_pool, slave_pool):
        self.master_pool = master_pool
        self.slave_pool = slave_pool
    
    def check_replication_status(self) -> Dict:
        master_status = self._get_master_status()
        slave_status = self._get_slave_status()
        
        return {
            'master': master_status,
            'slave': slave_status,
            'delay': self._calculate_delay(master_status, slave_status)
        }
    
    def _get_master_status(self) -> Dict:
        with self.master_pool.get_conn() as conn:
            with conn.cursor() as cursor:
                cursor.execute("SHOW MASTER STATUS")
                return cursor.fetchone()
    
    def _get_slave_status(self) -> Dict:
        with self.slave_pool.get_conn() as conn:
            with conn.cursor() as cursor:
                cursor.execute("SHOW SLAVE STATUS")
                return cursor.fetchone()
    
    def _calculate_delay(self, master_status: Dict, slave_status: Dict) -> int:
        # 计算主从延迟
        if not master_status or not slave_status:
            return -1
        return slave_status.get('Seconds_Behind_Master', -1)

# 使用示例
monitor = ReplicationMonitor(master_pool, slave_pool)
status = monitor.check_replication_status()
print(f"主从延迟: {status['delay']} 秒")
12-20 10:44