跳转至

04-MySQL实战

MySQL实战

最流行的开源关系型数据库 目标:掌握MySQL的安装配置、日常操作和Python集成


📋 本章概览

预计学习时间:4-5小时 前置章节第01章:数据库基础概念第02章:SQL核心语法第03章:数据库设计与范式 实践要求:完成本章所有动手操作,搭建一个完整的用户管理系统

本章内容: 1. MySQL安装与配置 2. MySQL数据类型详解 3. 存储引擎选择 4. 用户权限管理 5. 备份与恢复 6. Python操作MySQL 7. 实战:用户管理系统


1. MySQL安装与配置

1.1 Windows安装

方式1:官方安装包

Bash
# 1. 下载MySQL Installer
# https://dev.mysql.com/downloads/installer/

# 2. 选择安装类型:
#    - Server only(仅服务器)
#    - Full(完整安装,包含Workbench等工具)

# 3. 配置root密码
# 4. 记住端口号(默认3306)

方式2:使用Chocolatey

PowerShell
# 安装Chocolatey后
choco install mysql

# 启动服务
net start mysql

1.2 macOS安装

Bash
# 使用Homebrew安装
brew install mysql

# 启动服务
brew services start mysql

# 初始化(设置root密码)
mysql_secure_installation

1.3 Linux安装(Ubuntu)

Bash
# 安装
sudo apt update
sudo apt install mysql-server

# 启动服务
sudo systemctl start mysql
sudo systemctl enable mysql

# 安全配置
sudo mysql_secure_installation

1.4 Docker安装(推荐)

Bash
# 拉取镜像
docker pull mysql:8.0

# 运行容器
docker run -d \
  --name mysql8 \
  -p 3306:3306 \
  -e MYSQL_ROOT_PASSWORD=your_password \
  -v mysql_data:/var/lib/mysql \
  mysql:8.0

# 进入容器
docker exec -it mysql8 mysql -uroot -p

1.5 基本配置

配置文件位置: - Windows: C:\ProgramData\MySQL\MySQL Server 8.0\my.ini - Linux/macOS: /etc/mysql/my.cnf/etc/my.cnf

常用配置

INI
[mysqld]
# 端口号
port=3306

# 数据目录
datadir=/var/lib/mysql

# 字符集
character-set-server=utf8mb4
collation-server=utf8mb4_unicode_ci

# 最大连接数
max_connections=200

# 缓冲区大小
innodb_buffer_pool_size=1G

# 慢查询日志
slow_query_log=1
slow_query_log_file=/var/log/mysql/slow.log
long_query_time=2

# 错误日志
log_error=/var/log/mysql/error.log

1.6 连接MySQL

命令行连接

Bash
# 本地连接
mysql -u root -p

# 远程连接
mysql -h 192.168.1.100 -P 3306 -u root -p

# 指定数据库
mysql -u root -p database_name

常用命令

SQL
-- 查看版本
SELECT VERSION();

-- 查看所有数据库
SHOW DATABASES;

-- 创建数据库
CREATE DATABASE mydb CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;

-- 使用数据库
USE mydb;

-- 查看当前数据库
SELECT DATABASE();

-- 查看所有表
SHOW TABLES;

-- 查看表结构
DESCRIBE users;
SHOW CREATE TABLE users;

-- 查看当前用户
SELECT USER();

-- 查看所有用户
SELECT user, host FROM mysql.user;


2. MySQL数据类型详解

2.1 数值类型

类型 大小 范围 用途
TINYINT 1字节 -128~127 小整数
SMALLINT 2字节 -32768~32767 较小整数
MEDIUMINT 3字节 -838万~838万 中等整数
INT 4字节 -21亿~21亿 标准整数
BIGINT 8字节 极大范围 大整数
FLOAT 4字节 - 单精度浮点
DOUBLE 8字节 - 双精度浮点
DECIMAL(M,D) 变长 依赖M,D 精确小数
SQL
-- 示例
CREATE TABLE products (
    product_id INT PRIMARY KEY AUTO_INCREMENT,
    stock TINYINT UNSIGNED,           -- 库存:0-255
    view_count INT UNSIGNED,          -- 浏览量:0-42亿
    price DECIMAL(10,2),              -- 价格:最大99999999.99
    weight FLOAT                      -- 重量:近似值
);

2.2 字符串类型

类型 最大长度 特点 用途
CHAR(N) 255字符 定长,速度快 固定长度(如手机号)
VARCHAR(N) 65535字节 变长,省空间 大多数字符串
TINYTEXT 255字节 - 短文本
TEXT 64KB - 文章、描述
MEDIUMTEXT 16MB - 长文本
LONGTEXT 4GB - 极大文本
JSON - JSON格式 结构化数据
SQL
-- 示例
CREATE TABLE articles (
    article_id INT PRIMARY KEY AUTO_INCREMENT,
    title VARCHAR(200) NOT NULL,      -- 标题,最多200字符
    summary VARCHAR(500),             -- 摘要
    content TEXT,                     -- 正文
    tags JSON,                        -- 标签:["AI", "Python"]
    phone CHAR(11)                    -- 手机号,定长11位
);

2.3 日期时间类型

类型 格式 范围 用途
DATE YYYY-MM-DD 1000-9999 日期
TIME HH:MM:SS -838:59:59 ~ 838:59:59 时间
DATETIME YYYY-MM-DD HH:MM:SS 1000-9999 日期时间
TIMESTAMP YYYY-MM-DD HH:MM:SS 1970-2038 时间戳
YEAR YYYY 1901-2155 年份
SQL
-- 示例
CREATE TABLE events (
    event_id INT PRIMARY KEY AUTO_INCREMENT,
    event_name VARCHAR(100),
    event_date DATE,                  -- 如:2024-01-15
    start_time TIME,                  -- 如:09:00:00
    created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

-- 时间函数
SELECT NOW();                         -- 当前日期时间
SELECT CURDATE();                     -- 当前日期
SELECT CURTIME();                     -- 当前时间
SELECT DATE_FORMAT(created_at, '%Y-%m-%d') FROM events;
SELECT DATEDIFF('2024-12-31', '2024-01-01');  -- 日期差

2.4 其他类型

SQL
-- 枚举类型
CREATE TABLE users (
    gender ENUM('男', '女', '保密') DEFAULT '保密',
    status ENUM('active', 'inactive', 'banned') DEFAULT 'active'
);

-- 布尔类型(实际用TINYINT(1))
CREATE TABLE tasks (
    is_completed BOOLEAN DEFAULT FALSE,
    is_urgent BOOL DEFAULT FALSE
);

-- 二进制数据
CREATE TABLE files (
    file_name VARCHAR(255),
    file_data BLOB,                   -- 二进制大对象
    file_size INT
);

3. 存储引擎

3.1 常用存储引擎对比

特性 InnoDB MyISAM Memory
事务支持
行级锁 ❌(表锁) ❌(表锁)
外键
崩溃恢复
全文索引 ✅(5.6+)
适用场景 默认、通用 读多写少 临时数据

3.2 InnoDB详解

特点: - MySQL 5.5+ 的默认引擎 - 支持事务(ACID) - 支持行级锁(并发性能好) - 支持外键约束 - 支持崩溃恢复

SQL
-- 创建InnoDB表(默认)
CREATE TABLE users (
    user_id INT PRIMARY KEY AUTO_INCREMENT,
    username VARCHAR(50)
) ENGINE=InnoDB;

-- 查看表的存储引擎
SHOW TABLE STATUS WHERE Name = 'users';

-- 修改存储引擎
ALTER TABLE users ENGINE=InnoDB;

3.3 存储引擎选择建议

  • 默认选择InnoDB:满足99%的场景
  • MyISAM:只读或读多写少的报表系统
  • Memory:临时表、缓存表(重启丢失)
  • Archive:归档历史数据

4. 用户权限管理

4.1 创建用户

SQL
-- 创建本地用户
CREATE USER 'zhangsan'@'localhost' IDENTIFIED BY 'password123';

-- 创建远程用户(指定IP)
CREATE USER 'lisi'@'192.168.1.%' IDENTIFIED BY 'password123';

-- 创建任意主机用户(不推荐)
CREATE USER 'wangwu'@'%' IDENTIFIED BY 'password123';

-- 修改密码
ALTER USER 'zhangsan'@'localhost' IDENTIFIED BY 'newpassword';

-- 删除用户
DROP USER 'zhangsan'@'localhost';

4.2 授权管理

SQL
-- 授权语法
GRANT 权限 ON 数据库. TO '用户'@'主机';

-- 常用权限
-- ALL PRIVILEGES:所有权限
-- SELECT:查询
-- INSERT:插入
-- UPDATE:更新
-- DELETE:删除
-- CREATE:创建表/数据库
-- DROP:删除表/数据库
-- INDEX:创建/删除索引

-- 示例1:授予只读权限
GRANT SELECT ON mydb.* TO 'readonly'@'%';

-- 示例2:授予读写权限
GRANT SELECT, INSERT, UPDATE, DELETE ON mydb.* TO 'readwrite'@'%';

-- 示例3:授予特定表权限
GRANT SELECT, INSERT ON mydb.users TO 'app_user'@'%';

-- 示例4:授予所有权限(管理员)
GRANT ALL PRIVILEGES ON *.* TO 'admin'@'%' WITH GRANT OPTION;

-- 刷新权限(立即生效)
FLUSH PRIVILEGES;

-- 查看用户权限
SHOW GRANTS FOR 'zhangsan'@'localhost';

-- 撤销权限
REVOKE DELETE ON mydb.* FROM 'readwrite'@'%';

4.3 角色管理(MySQL 8.0+)

SQL
-- 创建角色
CREATE ROLE 'app_read', 'app_write', 'app_admin';

-- 给角色授权
GRANT SELECT ON mydb.* TO 'app_read';
GRANT SELECT, INSERT, UPDATE, DELETE ON mydb.* TO 'app_write';
GRANT ALL PRIVILEGES ON mydb.* TO 'app_admin';

-- 给用户分配角色
GRANT 'app_read' TO 'zhangsan'@'localhost';
GRANT 'app_write' TO 'lisi'@'localhost';

-- 设置默认角色
SET DEFAULT ROLE 'app_read' TO 'zhangsan'@'localhost';

-- 激活角色
SET ROLE 'app_read';

-- 查看当前角色
SELECT CURRENT_ROLE();

5. 备份与恢复

5.1 使用mysqldump备份

Bash
# 备份单个数据库
mysqldump -u root -p mydb > mydb_backup.sql

# 备份多个数据库
mysqldump -u root -p --databases db1 db2 > backup.sql

# 备份所有数据库
mysqldump -u root -p --all-databases > all_backup.sql

# 仅备份表结构(不含数据)
mysqldump -u root -p --no-data mydb > schema.sql

# 仅备份数据(不含结构)
mysqldump -u root -p --no-create-info mydb > data.sql

# 压缩备份
mysqldump -u root -p mydb | gzip > mydb_backup.sql.gz

5.2 恢复数据

Bash
# 恢复数据库
mysql -u root -p mydb < mydb_backup.sql

# 恢复前创建数据库
mysql -u root -p -e "CREATE DATABASE IF NOT EXISTS mydb;"
mysql -u root -p mydb < mydb_backup.sql

# 解压并恢复
gunzip < mydb_backup.sql.gz | mysql -u root -p mydb

5.3 物理备份(InnoDB)

Bash
# 使用Percona XtraBackup(热备份,不锁表)
# 安装:https://www.percona.com/software/mysql-database/percona-xtrabackup

# 全量备份
xtrabackup --backup --target-dir=/backup/full

# 增量备份
xtrabackup --backup --target-dir=/backup/inc1 --incremental-basedir=/backup/full

# 恢复
xtrabackup --prepare --target-dir=/backup/full
xtrabackup --copy-back --target-dir=/backup/full

5.4 定时备份脚本

Bash
#!/bin/bash
# backup.sh

DB_NAME="mydb"
BACKUP_DIR="/backup/mysql"
DATE=$(date +%Y%m%d_%H%M%S)  # $()命令替换:执行命令并获取输出
BACKUP_FILE="$BACKUP_DIR/${DB_NAME}_$DATE.sql.gz"

# 保留最近7天的备份
RETENTION_DAYS=7

# 执行备份(使用 --defaults-extra-file 避免密码暴露在命令行)
# 创建 /root/.my_backup.cnf 内容:[mysqldump] user=root password=xxx
# chmod 600 /root/.my_backup.cnf
mysqldump --defaults-extra-file=/root/.my_backup.cnf $DB_NAME | gzip > $BACKUP_FILE  # |管道:将前一命令的输出作为后一命令的输入

# 检查备份是否成功
if [ $? -eq 0 ]; then  # 条件测试:-f文件存在 -d目录存在 -z空字符串
    echo "Backup successful: $BACKUP_FILE"
else
    echo "Backup failed!"
    exit 1
fi

# 删除旧备份
find $BACKUP_DIR -name "${DB_NAME}_*.sql.gz" -mtime +$RETENTION_DAYS -delete

echo "Cleanup completed"

添加到crontab:

Bash
# 每天凌晨2点备份
0 2 * * * /path/to/backup.sh >> /var/log/mysql_backup.log 2>&1


6. Python操作MySQL

6.1 使用PyMySQL

安装

Bash
pip install pymysql

基础操作

Python
import pymysql

# 建立连接
conn = pymysql.connect(
    host='localhost',
    port=3306,
    user='root',
    password='your_password',
    database='mydb',
    charset='utf8mb4'
)

try:
    # 创建游标
    with conn.cursor() as cursor:
        # 执行SQL
        cursor.execute("SELECT VERSION()")
        result = cursor.fetchone()
        print(f"MySQL版本: {result[0]}")

        # 创建表
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id INT PRIMARY KEY AUTO_INCREMENT,
                username VARCHAR(50) NOT NULL,
                email VARCHAR(100) UNIQUE,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)

        # 插入数据
        cursor.execute(
            "INSERT INTO users (username, email) VALUES (%s, %s)",
            ('张三', 'zhangsan@example.com')
        )

        # 批量插入
        users = [
            ('李四', 'lisi@example.com'),
            ('王五', 'wangwu@example.com'),
            ('赵六', 'zhaoliu@example.com')
        ]
        cursor.executemany(
            "INSERT INTO users (username, email) VALUES (%s, %s)",
            users
        )

        # 提交事务
        conn.commit()

        # 查询数据
        # ⚠️ 生产环境应避免 SELECT *,明确指定需要的列(如 SELECT id, username, email)
        cursor.execute("SELECT * FROM users WHERE id > %s", (0,))
        results = cursor.fetchall()

        for row in results:
            print(f"ID: {row[0]}, 用户名: {row[1]}, 邮箱: {row[2]}")

        # 查询单条
        cursor.execute("SELECT * FROM users WHERE id = %s", (1,))
        user = cursor.fetchone()
        print(f"用户: {user}")

        # 更新数据
        cursor.execute(
            "UPDATE users SET email = %s WHERE id = %s",
            ('newemail@example.com', 1)
        )
        conn.commit()

        # 删除数据
        cursor.execute("DELETE FROM users WHERE id = %s", (5,))
        conn.commit()

finally:
    conn.close()

6.2 使用上下文管理器

Python
import pymysql
from contextlib import contextmanager

@contextmanager
def get_db_connection():
    """数据库连接上下文管理器"""
    conn = pymysql.connect(
        host='localhost',
        user='root',
        password='your_password',
        database='mydb',
        charset='utf8mb4',
        cursorclass=pymysql.cursors.DictCursor  # 返回字典格式
    )
    try:
        yield conn
    finally:
        conn.close()

@contextmanager
def get_cursor(conn):
    """游标上下文管理器"""
    cursor = conn.cursor()
    try:
        yield cursor
        conn.commit()
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        cursor.close()

# 使用示例
with get_db_connection() as conn:
    with get_cursor(conn) as cursor:
        cursor.execute("SELECT * FROM users WHERE id = %s", (1,))
        user = cursor.fetchone()
        print(user)  # {'id': 1, 'username': '张三', ...}

6.3 使用SQLAlchemy(ORM)

安装

Bash
pip install sqlalchemy pymysql

基础使用

Python
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.orm import sessionmaker, declarative_base
from datetime import datetime

# 创建引擎
engine = create_engine(
    'mysql+pymysql://root:password@localhost:3306/mydb?charset=utf8mb4',
    echo=True  # 打印SQL语句
)

# 声明基类
Base = declarative_base()

# 定义模型
class User(Base):
    __tablename__ = 'users'

    id = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String(50), nullable=False)
    email = Column(String(100), unique=True)
    created_at = Column(DateTime, default=datetime.now)

    def __repr__(self):
        return f"<User(id={self.id}, username='{self.username}')>"

# 创建表
Base.metadata.create_all(engine)

# 创建会话
Session = sessionmaker(bind=engine)
session = Session()

# 增
try:
    new_user = User(username='张三', email='zhangsan@example.com')
    session.add(new_user)
    session.commit()
    print(f"创建用户: {new_user.id}")
except Exception as e:
    session.rollback()
    print(f"错误: {e}")

# 查
user = session.query(User).filter_by(username='张三').first()
print(f"查询结果: {user}")

# 查所有
users = session.query(User).all()
for u in users:
    print(u)

# 条件查询
users = session.query(User).filter(User.id > 0).order_by(User.created_at.desc()).limit(10).all()

# 改
user = session.query(User).filter_by(username='张三').first()
if user:
    user.email = 'newemail@example.com'
    session.commit()

# 删
user = session.query(User).filter_by(username='张三').first()
if user:
    session.delete(user)
    session.commit()

# 关闭会话
session.close()

6.4 连接池

Python
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool

# 使用连接池
engine = create_engine(
    'mysql+pymysql://root:password@localhost:3306/mydb',
    poolclass=QueuePool,
    pool_size=10,           # 连接池大小
    max_overflow=20,        # 最大溢出连接
    pool_recycle=3600,      # 连接回收时间(秒)
    pool_pre_ping=True      # 连接前ping测试
)

Session = sessionmaker(bind=engine)

7. 实战:用户管理系统

7.1 需求分析

功能需求: - 用户注册/登录 - 用户信息管理 - 用户角色权限 - 操作日志记录

数据库设计

SQL
-- 用户表
CREATE TABLE users (
    user_id INT PRIMARY KEY AUTO_INCREMENT,
    username VARCHAR(50) NOT NULL UNIQUE,
    password_hash VARCHAR(255) NOT NULL,
    email VARCHAR(100) UNIQUE,
    phone VARCHAR(20),
    status ENUM('active', 'inactive', 'banned') DEFAULT 'active',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    INDEX idx_username (username),
    INDEX idx_email (email)
) ENGINE=InnoDB;

-- 角色表
CREATE TABLE roles (
    role_id INT PRIMARY KEY AUTO_INCREMENT,
    role_name VARCHAR(50) NOT NULL UNIQUE,
    description VARCHAR(255)
) ENGINE=InnoDB;

-- 用户角色关联表
CREATE TABLE user_roles (
    user_id INT,
    role_id INT,
    granted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (user_id, role_id),
    FOREIGN KEY (user_id) REFERENCES users(user_id) ON DELETE CASCADE,
    FOREIGN KEY (role_id) REFERENCES roles(role_id) ON DELETE CASCADE
) ENGINE=InnoDB;

-- 操作日志表
CREATE TABLE operation_logs (
    log_id BIGINT PRIMARY KEY AUTO_INCREMENT,
    user_id INT,
    operation VARCHAR(50),
    target_type VARCHAR(50),
    target_id VARCHAR(50),
    details JSON,
    ip_address VARCHAR(45),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (user_id) REFERENCES users(user_id),
    INDEX idx_user_id (user_id),
    INDEX idx_created_at (created_at)
) ENGINE=InnoDB;

-- 初始化角色数据
INSERT INTO roles (role_name, description) VALUES
('admin', '管理员'),
('user', '普通用户'),
('guest', '访客');

7.2 Python实现

Python
import pymysql
import hashlib
import secrets
import json
from datetime import datetime
from contextlib import contextmanager

class UserManager:
    def __init__(self, host='localhost', user='root', password='', database='mydb'):
        self.config = {
            'host': host,
            'user': user,
            'password': password,
            'database': database,
            'charset': 'utf8mb4',
            'cursorclass': pymysql.cursors.DictCursor
        }

    @contextmanager
    def _get_connection(self):
        conn = pymysql.connect(**self.config)
        try:  # try/except捕获异常
            yield conn  # yield生成器:惰性产出值,节省内存
        finally:
            conn.close()

    def _hash_password(self, password: str, salt: str = None) -> tuple:
        """密码哈希"""
        if salt is None:
            salt = secrets.token_hex(16)
        hash_value = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000)
        return hash_value.hex(), salt

    def _verify_password(self, password: str, hash_value: str, salt: str) -> bool:
        """验证密码"""
        computed_hash, _ = self._hash_password(password, salt)
        return computed_hash == hash_value

    def register(self, username: str, password: str, email: str = None, phone: str = None) -> dict:
        """用户注册"""
        password_hash, salt = self._hash_password(password)
        full_hash = f"{salt}${password_hash}"

        with self._get_connection() as conn:
            with conn.cursor() as cursor:
                try:
                    cursor.execute(
                        "INSERT INTO users (username, password_hash, email, phone) VALUES (%s, %s, %s, %s)",
                        (username, full_hash, email, phone)
                    )
                    user_id = cursor.lastrowid

                    # 分配默认角色
                    cursor.execute(
                        "INSERT INTO user_roles (user_id, role_id) VALUES (%s, (SELECT role_id FROM roles WHERE role_name = 'user'))",
                        (user_id,)
                    )

                    conn.commit()

                    self._log_operation(user_id, 'REGISTER', 'user', str(user_id), {'username': username})

                    return {'success': True, 'user_id': user_id, 'message': '注册成功'}

                except pymysql.err.IntegrityError as e:
                    if 'username' in str(e):
                        return {'success': False, 'message': '用户名已存在'}
                    elif 'email' in str(e):
                        return {'success': False, 'message': '邮箱已被使用'}
                    raise

    def login(self, username: str, password: str, ip_address: str = None) -> dict:
        """用户登录"""
        with self._get_connection() as conn:
            with conn.cursor() as cursor:
                cursor.execute(
                    "SELECT user_id, password_hash, status FROM users WHERE username = %s",
                    (username,)
                )
                user = cursor.fetchone()

                if not user:
                    return {'success': False, 'message': '用户名或密码错误'}

                if user['status'] == 'banned':
                    return {'success': False, 'message': '账号已被禁用'}

                # 验证密码
                salt, hash_value = user['password_hash'].split('$')
                if not self._verify_password(password, hash_value, salt):
                    self._log_operation(user['user_id'], 'LOGIN_FAILED', 'user', str(user['user_id']),
                                      {'username': username, 'ip': ip_address})
                    return {'success': False, 'message': '用户名或密码错误'}

                # 获取用户角色
                cursor.execute("""
                    SELECT r.role_name FROM roles r
                    JOIN user_roles ur ON r.role_id = ur.role_id
                    WHERE ur.user_id = %s
                """, (user['user_id'],))
                roles = [row['role_name'] for row in cursor.fetchall()]

                self._log_operation(user['user_id'], 'LOGIN', 'user', str(user['user_id']),
                                  {'username': username, 'ip': ip_address})

                return {
                    'success': True,
                    'user_id': user['user_id'],
                    'username': username,
                    'roles': roles,
                    'message': '登录成功'
                }

    def get_user(self, user_id: int) -> dict:
        """获取用户信息"""
        with self._get_connection() as conn:
            with conn.cursor() as cursor:
                cursor.execute(
                    "SELECT user_id, username, email, phone, status, created_at FROM users WHERE user_id = %s",
                    (user_id,)
                )
                return cursor.fetchone()

    def update_user(self, user_id: int, **kwargs) -> dict:  # *args接收任意位置参数;**kwargs接收任意关键字参数
        """更新用户信息"""
        allowed_fields = ['email', 'phone', 'status']
        updates = {k: v for k, v in kwargs.items() if k in allowed_fields}

        if not updates:
            return {'success': False, 'message': '没有可更新的字段'}

        set_clause = ', '.join([f"{k} = %s" for k in updates.keys()])
        values = list(updates.values()) + [user_id]

        with self._get_connection() as conn:
            with conn.cursor() as cursor:
                cursor.execute(f"UPDATE users SET {set_clause} WHERE user_id = %s", values)
                conn.commit()

                self._log_operation(user_id, 'UPDATE_USER', 'user', str(user_id), updates)

                return {'success': True, 'message': '更新成功'}

    def delete_user(self, user_id: int) -> dict:
        """删除用户"""
        with self._get_connection() as conn:
            with conn.cursor() as cursor:
                cursor.execute("DELETE FROM users WHERE user_id = %s", (user_id,))
                conn.commit()

                self._log_operation(user_id, 'DELETE_USER', 'user', str(user_id), {})

                return {'success': True, 'message': '删除成功'}

    def list_users(self, page: int = 1, page_size: int = 10, status: str = None) -> dict:
        """用户列表"""
        with self._get_connection() as conn:
            with conn.cursor() as cursor:
                where_clause = "WHERE status = %s" if status else ""
                params = [status] if status else []

                # 查询总数
                cursor.execute(f"SELECT COUNT(*) as total FROM users {where_clause}", params)
                total = cursor.fetchone()['total']

                # 查询数据
                offset = (page - 1) * page_size
                cursor.execute(f"""
                    SELECT user_id, username, email, phone, status, created_at
                    FROM users {where_clause}
                    ORDER BY created_at DESC
                    LIMIT %s OFFSET %s
                """, params + [page_size, offset])

                users = cursor.fetchall()

                return {
                    'success': True,
                    'total': total,
                    'page': page,
                    'page_size': page_size,
                    'users': users
                }

    def _log_operation(self, user_id: int, operation: str, target_type: str,
                      target_id: str, details: dict, ip_address: str = None):
        """记录操作日志"""
        with self._get_connection() as conn:
            with conn.cursor() as cursor:
                cursor.execute(
                    "INSERT INTO operation_logs (user_id, operation, target_type, target_id, details, ip_address) VALUES (%s, %s, %s, %s, %s, %s)",
                    (user_id, operation, target_type, target_id, json.dumps(details), ip_address)  # json.dumps将Python对象转为JSON字符串
                )
                conn.commit()

# 使用示例
if __name__ == '__main__':
    um = UserManager(password='your_password')

    # 注册
    result = um.register('testuser', 'password123', 'test@example.com')
    print(result)

    # 登录
    result = um.login('testuser', 'password123')
    print(result)

    # 获取用户列表
    result = um.list_users(page=1, page_size=5)
    print(result)

🎯 本章自测

安装配置

  1. 在你的电脑上安装MySQL,并创建一个新的数据库

  2. 配置MySQL允许远程连接(注意安全)

SQL操作

  1. 创建一个电商数据库,包含以下表
  2. 商品表(products):id, name, price, stock, category_id
  3. 分类表(categories):id, name
  4. 订单表(orders):id, user_id, total_amount, status, created_at
  5. 订单项表(order_items):id, order_id, product_id, quantity, price

  6. 插入测试数据,并编写以下查询

  7. 查询每个分类的商品数量
  8. 查询销量最高的前10个商品
  9. 查询某用户的订单历史

Python集成

  1. 使用Python连接MySQL,实现以下功能
  2. 商品CRUD操作
  3. 订单创建和查询
  4. 库存管理(下单时扣减库存)

备份恢复

  1. 编写一个备份脚本,每天自动备份数据库

📚 扩展阅读

推荐资源

下一步

完成本章后,继续学习 第05章:PostgreSQL进阶,了解功能更强大的PostgreSQL!


本章完 | 预计学习时间:4-5小时