04-MySQL实战¶
最流行的开源关系型数据库 目标:掌握MySQL的安装配置、日常操作和Python集成
📋 本章概览¶
预计学习时间:4-5小时 前置章节:第01章:数据库基础概念、第02章:SQL核心语法、第03章:数据库设计与范式 实践要求:完成本章所有动手操作,搭建一个完整的用户管理系统
本章内容: 1. MySQL安装与配置 2. MySQL数据类型详解 3. 存储引擎选择 4. 用户权限管理 5. 备份与恢复 6. Python操作MySQL 7. 实战:用户管理系统
1. MySQL安装与配置¶
1.1 Windows安装¶
方式1:官方安装包
Bash
# 1. 下载MySQL Installer
# https://dev.mysql.com/downloads/installer/
# 2. 选择安装类型:
# - Server only(仅服务器)
# - Full(完整安装,包含Workbench等工具)
# 3. 配置root密码
# 4. 记住端口号(默认3306)
方式2:使用Chocolatey
1.2 macOS安装¶
Bash
# 使用Homebrew安装
brew install mysql
# 启动服务
brew services start mysql
# 初始化(设置root密码)
mysql_secure_installation
1.3 Linux安装(Ubuntu)¶
Bash
# 安装
sudo apt update
sudo apt install mysql-server
# 启动服务
sudo systemctl start mysql
sudo systemctl enable mysql
# 安全配置
sudo mysql_secure_installation
1.4 Docker安装(推荐)¶
Bash
# 拉取镜像
docker pull mysql:8.0
# 运行容器
docker run -d \
--name mysql8 \
-p 3306:3306 \
-e MYSQL_ROOT_PASSWORD=your_password \
-v mysql_data:/var/lib/mysql \
mysql:8.0
# 进入容器
docker exec -it mysql8 mysql -uroot -p
1.5 基本配置¶
配置文件位置: - Windows: C:\ProgramData\MySQL\MySQL Server 8.0\my.ini - Linux/macOS: /etc/mysql/my.cnf 或 /etc/my.cnf
常用配置:
INI
[mysqld]
# 端口号
port=3306
# 数据目录
datadir=/var/lib/mysql
# 字符集
character-set-server=utf8mb4
collation-server=utf8mb4_unicode_ci
# 最大连接数
max_connections=200
# 缓冲区大小
innodb_buffer_pool_size=1G
# 慢查询日志
slow_query_log=1
slow_query_log_file=/var/log/mysql/slow.log
long_query_time=2
# 错误日志
log_error=/var/log/mysql/error.log
1.6 连接MySQL¶
命令行连接:
Bash
# 本地连接
mysql -u root -p
# 远程连接
mysql -h 192.168.1.100 -P 3306 -u root -p
# 指定数据库
mysql -u root -p database_name
常用命令:
SQL
-- 查看版本
SELECT VERSION();
-- 查看所有数据库
SHOW DATABASES;
-- 创建数据库
CREATE DATABASE mydb CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
-- 使用数据库
USE mydb;
-- 查看当前数据库
SELECT DATABASE();
-- 查看所有表
SHOW TABLES;
-- 查看表结构
DESCRIBE users;
SHOW CREATE TABLE users;
-- 查看当前用户
SELECT USER();
-- 查看所有用户
SELECT user, host FROM mysql.user;
2. MySQL数据类型详解¶
2.1 数值类型¶
| 类型 | 大小 | 范围 | 用途 |
|---|---|---|---|
| TINYINT | 1字节 | -128~127 | 小整数 |
| SMALLINT | 2字节 | -32768~32767 | 较小整数 |
| MEDIUMINT | 3字节 | -838万~838万 | 中等整数 |
| INT | 4字节 | -21亿~21亿 | 标准整数 |
| BIGINT | 8字节 | 极大范围 | 大整数 |
| FLOAT | 4字节 | - | 单精度浮点 |
| DOUBLE | 8字节 | - | 双精度浮点 |
| DECIMAL(M,D) | 变长 | 依赖M,D | 精确小数 |
SQL
-- 示例
CREATE TABLE products (
product_id INT PRIMARY KEY AUTO_INCREMENT,
stock TINYINT UNSIGNED, -- 库存:0-255
view_count INT UNSIGNED, -- 浏览量:0-42亿
price DECIMAL(10,2), -- 价格:最大99999999.99
weight FLOAT -- 重量:近似值
);
2.2 字符串类型¶
| 类型 | 最大长度 | 特点 | 用途 |
|---|---|---|---|
| CHAR(N) | 255字符 | 定长,速度快 | 固定长度(如手机号) |
| VARCHAR(N) | 65535字节 | 变长,省空间 | 大多数字符串 |
| TINYTEXT | 255字节 | - | 短文本 |
| TEXT | 64KB | - | 文章、描述 |
| MEDIUMTEXT | 16MB | - | 长文本 |
| LONGTEXT | 4GB | - | 极大文本 |
| JSON | - | JSON格式 | 结构化数据 |
SQL
-- 示例
CREATE TABLE articles (
article_id INT PRIMARY KEY AUTO_INCREMENT,
title VARCHAR(200) NOT NULL, -- 标题,最多200字符
summary VARCHAR(500), -- 摘要
content TEXT, -- 正文
tags JSON, -- 标签:["AI", "Python"]
phone CHAR(11) -- 手机号,定长11位
);
2.3 日期时间类型¶
| 类型 | 格式 | 范围 | 用途 |
|---|---|---|---|
| DATE | YYYY-MM-DD | 1000-9999 | 日期 |
| TIME | HH:MM:SS | -838:59:59 ~ 838:59:59 | 时间 |
| DATETIME | YYYY-MM-DD HH:MM:SS | 1000-9999 | 日期时间 |
| TIMESTAMP | YYYY-MM-DD HH:MM:SS | 1970-2038 | 时间戳 |
| YEAR | YYYY | 1901-2155 | 年份 |
SQL
-- 示例
CREATE TABLE events (
event_id INT PRIMARY KEY AUTO_INCREMENT,
event_name VARCHAR(100),
event_date DATE, -- 如:2024-01-15
start_time TIME, -- 如:09:00:00
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- 时间函数
SELECT NOW(); -- 当前日期时间
SELECT CURDATE(); -- 当前日期
SELECT CURTIME(); -- 当前时间
SELECT DATE_FORMAT(created_at, '%Y-%m-%d') FROM events;
SELECT DATEDIFF('2024-12-31', '2024-01-01'); -- 日期差
2.4 其他类型¶
SQL
-- 枚举类型
CREATE TABLE users (
gender ENUM('男', '女', '保密') DEFAULT '保密',
status ENUM('active', 'inactive', 'banned') DEFAULT 'active'
);
-- 布尔类型(实际用TINYINT(1))
CREATE TABLE tasks (
is_completed BOOLEAN DEFAULT FALSE,
is_urgent BOOL DEFAULT FALSE
);
-- 二进制数据
CREATE TABLE files (
file_name VARCHAR(255),
file_data BLOB, -- 二进制大对象
file_size INT
);
3. 存储引擎¶
3.1 常用存储引擎对比¶
| 特性 | InnoDB | MyISAM | Memory |
|---|---|---|---|
| 事务支持 | ✅ | ❌ | ❌ |
| 行级锁 | ✅ | ❌(表锁) | ❌(表锁) |
| 外键 | ✅ | ❌ | ❌ |
| 崩溃恢复 | ✅ | ❌ | ❌ |
| 全文索引 | ✅(5.6+) | ✅ | ❌ |
| 适用场景 | 默认、通用 | 读多写少 | 临时数据 |
3.2 InnoDB详解¶
特点: - MySQL 5.5+ 的默认引擎 - 支持事务(ACID) - 支持行级锁(并发性能好) - 支持外键约束 - 支持崩溃恢复
SQL
-- 创建InnoDB表(默认)
CREATE TABLE users (
user_id INT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(50)
) ENGINE=InnoDB;
-- 查看表的存储引擎
SHOW TABLE STATUS WHERE Name = 'users';
-- 修改存储引擎
ALTER TABLE users ENGINE=InnoDB;
3.3 存储引擎选择建议¶
- 默认选择InnoDB:满足99%的场景
- MyISAM:只读或读多写少的报表系统
- Memory:临时表、缓存表(重启丢失)
- Archive:归档历史数据
4. 用户权限管理¶
4.1 创建用户¶
SQL
-- 创建本地用户
CREATE USER 'zhangsan'@'localhost' IDENTIFIED BY 'password123';
-- 创建远程用户(指定IP)
CREATE USER 'lisi'@'192.168.1.%' IDENTIFIED BY 'password123';
-- 创建任意主机用户(不推荐)
CREATE USER 'wangwu'@'%' IDENTIFIED BY 'password123';
-- 修改密码
ALTER USER 'zhangsan'@'localhost' IDENTIFIED BY 'newpassword';
-- 删除用户
DROP USER 'zhangsan'@'localhost';
4.2 授权管理¶
SQL
-- 授权语法
GRANT 权限 ON 数据库.表 TO '用户'@'主机';
-- 常用权限
-- ALL PRIVILEGES:所有权限
-- SELECT:查询
-- INSERT:插入
-- UPDATE:更新
-- DELETE:删除
-- CREATE:创建表/数据库
-- DROP:删除表/数据库
-- INDEX:创建/删除索引
-- 示例1:授予只读权限
GRANT SELECT ON mydb.* TO 'readonly'@'%';
-- 示例2:授予读写权限
GRANT SELECT, INSERT, UPDATE, DELETE ON mydb.* TO 'readwrite'@'%';
-- 示例3:授予特定表权限
GRANT SELECT, INSERT ON mydb.users TO 'app_user'@'%';
-- 示例4:授予所有权限(管理员)
GRANT ALL PRIVILEGES ON *.* TO 'admin'@'%' WITH GRANT OPTION;
-- 刷新权限(立即生效)
FLUSH PRIVILEGES;
-- 查看用户权限
SHOW GRANTS FOR 'zhangsan'@'localhost';
-- 撤销权限
REVOKE DELETE ON mydb.* FROM 'readwrite'@'%';
4.3 角色管理(MySQL 8.0+)¶
SQL
-- 创建角色
CREATE ROLE 'app_read', 'app_write', 'app_admin';
-- 给角色授权
GRANT SELECT ON mydb.* TO 'app_read';
GRANT SELECT, INSERT, UPDATE, DELETE ON mydb.* TO 'app_write';
GRANT ALL PRIVILEGES ON mydb.* TO 'app_admin';
-- 给用户分配角色
GRANT 'app_read' TO 'zhangsan'@'localhost';
GRANT 'app_write' TO 'lisi'@'localhost';
-- 设置默认角色
SET DEFAULT ROLE 'app_read' TO 'zhangsan'@'localhost';
-- 激活角色
SET ROLE 'app_read';
-- 查看当前角色
SELECT CURRENT_ROLE();
5. 备份与恢复¶
5.1 使用mysqldump备份¶
Bash
# 备份单个数据库
mysqldump -u root -p mydb > mydb_backup.sql
# 备份多个数据库
mysqldump -u root -p --databases db1 db2 > backup.sql
# 备份所有数据库
mysqldump -u root -p --all-databases > all_backup.sql
# 仅备份表结构(不含数据)
mysqldump -u root -p --no-data mydb > schema.sql
# 仅备份数据(不含结构)
mysqldump -u root -p --no-create-info mydb > data.sql
# 压缩备份
mysqldump -u root -p mydb | gzip > mydb_backup.sql.gz
5.2 恢复数据¶
Bash
# 恢复数据库
mysql -u root -p mydb < mydb_backup.sql
# 恢复前创建数据库
mysql -u root -p -e "CREATE DATABASE IF NOT EXISTS mydb;"
mysql -u root -p mydb < mydb_backup.sql
# 解压并恢复
gunzip < mydb_backup.sql.gz | mysql -u root -p mydb
5.3 物理备份(InnoDB)¶
Bash
# 使用Percona XtraBackup(热备份,不锁表)
# 安装:https://www.percona.com/software/mysql-database/percona-xtrabackup
# 全量备份
xtrabackup --backup --target-dir=/backup/full
# 增量备份
xtrabackup --backup --target-dir=/backup/inc1 --incremental-basedir=/backup/full
# 恢复
xtrabackup --prepare --target-dir=/backup/full
xtrabackup --copy-back --target-dir=/backup/full
5.4 定时备份脚本¶
Bash
#!/bin/bash
# backup.sh
DB_NAME="mydb"
BACKUP_DIR="/backup/mysql"
DATE=$(date +%Y%m%d_%H%M%S) # $()命令替换:执行命令并获取输出
BACKUP_FILE="$BACKUP_DIR/${DB_NAME}_$DATE.sql.gz"
# 保留最近7天的备份
RETENTION_DAYS=7
# 执行备份(使用 --defaults-extra-file 避免密码暴露在命令行)
# 创建 /root/.my_backup.cnf 内容:[mysqldump] user=root password=xxx
# chmod 600 /root/.my_backup.cnf
mysqldump --defaults-extra-file=/root/.my_backup.cnf $DB_NAME | gzip > $BACKUP_FILE # |管道:将前一命令的输出作为后一命令的输入
# 检查备份是否成功
if [ $? -eq 0 ]; then # 条件测试:-f文件存在 -d目录存在 -z空字符串
echo "Backup successful: $BACKUP_FILE"
else
echo "Backup failed!"
exit 1
fi
# 删除旧备份
find $BACKUP_DIR -name "${DB_NAME}_*.sql.gz" -mtime +$RETENTION_DAYS -delete
echo "Cleanup completed"
添加到crontab:
6. Python操作MySQL¶
6.1 使用PyMySQL¶
安装:
基础操作:
Python
import pymysql
# 建立连接
conn = pymysql.connect(
host='localhost',
port=3306,
user='root',
password='your_password',
database='mydb',
charset='utf8mb4'
)
try:
# 创建游标
with conn.cursor() as cursor:
# 执行SQL
cursor.execute("SELECT VERSION()")
result = cursor.fetchone()
print(f"MySQL版本: {result[0]}")
# 创建表
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) UNIQUE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 插入数据
cursor.execute(
"INSERT INTO users (username, email) VALUES (%s, %s)",
('张三', 'zhangsan@example.com')
)
# 批量插入
users = [
('李四', 'lisi@example.com'),
('王五', 'wangwu@example.com'),
('赵六', 'zhaoliu@example.com')
]
cursor.executemany(
"INSERT INTO users (username, email) VALUES (%s, %s)",
users
)
# 提交事务
conn.commit()
# 查询数据
# ⚠️ 生产环境应避免 SELECT *,明确指定需要的列(如 SELECT id, username, email)
cursor.execute("SELECT * FROM users WHERE id > %s", (0,))
results = cursor.fetchall()
for row in results:
print(f"ID: {row[0]}, 用户名: {row[1]}, 邮箱: {row[2]}")
# 查询单条
cursor.execute("SELECT * FROM users WHERE id = %s", (1,))
user = cursor.fetchone()
print(f"用户: {user}")
# 更新数据
cursor.execute(
"UPDATE users SET email = %s WHERE id = %s",
('newemail@example.com', 1)
)
conn.commit()
# 删除数据
cursor.execute("DELETE FROM users WHERE id = %s", (5,))
conn.commit()
finally:
conn.close()
6.2 使用上下文管理器¶
Python
import pymysql
from contextlib import contextmanager
@contextmanager
def get_db_connection():
"""数据库连接上下文管理器"""
conn = pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='mydb',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor # 返回字典格式
)
try:
yield conn
finally:
conn.close()
@contextmanager
def get_cursor(conn):
"""游标上下文管理器"""
cursor = conn.cursor()
try:
yield cursor
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
# 使用示例
with get_db_connection() as conn:
with get_cursor(conn) as cursor:
cursor.execute("SELECT * FROM users WHERE id = %s", (1,))
user = cursor.fetchone()
print(user) # {'id': 1, 'username': '张三', ...}
6.3 使用SQLAlchemy(ORM)¶
安装:
基础使用:
Python
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.orm import sessionmaker, declarative_base
from datetime import datetime
# 创建引擎
engine = create_engine(
'mysql+pymysql://root:password@localhost:3306/mydb?charset=utf8mb4',
echo=True # 打印SQL语句
)
# 声明基类
Base = declarative_base()
# 定义模型
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(50), nullable=False)
email = Column(String(100), unique=True)
created_at = Column(DateTime, default=datetime.now)
def __repr__(self):
return f"<User(id={self.id}, username='{self.username}')>"
# 创建表
Base.metadata.create_all(engine)
# 创建会话
Session = sessionmaker(bind=engine)
session = Session()
# 增
try:
new_user = User(username='张三', email='zhangsan@example.com')
session.add(new_user)
session.commit()
print(f"创建用户: {new_user.id}")
except Exception as e:
session.rollback()
print(f"错误: {e}")
# 查
user = session.query(User).filter_by(username='张三').first()
print(f"查询结果: {user}")
# 查所有
users = session.query(User).all()
for u in users:
print(u)
# 条件查询
users = session.query(User).filter(User.id > 0).order_by(User.created_at.desc()).limit(10).all()
# 改
user = session.query(User).filter_by(username='张三').first()
if user:
user.email = 'newemail@example.com'
session.commit()
# 删
user = session.query(User).filter_by(username='张三').first()
if user:
session.delete(user)
session.commit()
# 关闭会话
session.close()
6.4 连接池¶
Python
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import QueuePool
# 使用连接池
engine = create_engine(
'mysql+pymysql://root:password@localhost:3306/mydb',
poolclass=QueuePool,
pool_size=10, # 连接池大小
max_overflow=20, # 最大溢出连接
pool_recycle=3600, # 连接回收时间(秒)
pool_pre_ping=True # 连接前ping测试
)
Session = sessionmaker(bind=engine)
7. 实战:用户管理系统¶
7.1 需求分析¶
功能需求: - 用户注册/登录 - 用户信息管理 - 用户角色权限 - 操作日志记录
数据库设计:
SQL
-- 用户表
CREATE TABLE users (
user_id INT PRIMARY KEY AUTO_INCREMENT,
username VARCHAR(50) NOT NULL UNIQUE,
password_hash VARCHAR(255) NOT NULL,
email VARCHAR(100) UNIQUE,
phone VARCHAR(20),
status ENUM('active', 'inactive', 'banned') DEFAULT 'active',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
INDEX idx_username (username),
INDEX idx_email (email)
) ENGINE=InnoDB;
-- 角色表
CREATE TABLE roles (
role_id INT PRIMARY KEY AUTO_INCREMENT,
role_name VARCHAR(50) NOT NULL UNIQUE,
description VARCHAR(255)
) ENGINE=InnoDB;
-- 用户角色关联表
CREATE TABLE user_roles (
user_id INT,
role_id INT,
granted_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (user_id, role_id),
FOREIGN KEY (user_id) REFERENCES users(user_id) ON DELETE CASCADE,
FOREIGN KEY (role_id) REFERENCES roles(role_id) ON DELETE CASCADE
) ENGINE=InnoDB;
-- 操作日志表
CREATE TABLE operation_logs (
log_id BIGINT PRIMARY KEY AUTO_INCREMENT,
user_id INT,
operation VARCHAR(50),
target_type VARCHAR(50),
target_id VARCHAR(50),
details JSON,
ip_address VARCHAR(45),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users(user_id),
INDEX idx_user_id (user_id),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB;
-- 初始化角色数据
INSERT INTO roles (role_name, description) VALUES
('admin', '管理员'),
('user', '普通用户'),
('guest', '访客');
7.2 Python实现¶
Python
import pymysql
import hashlib
import secrets
import json
from datetime import datetime
from contextlib import contextmanager
class UserManager:
def __init__(self, host='localhost', user='root', password='', database='mydb'):
self.config = {
'host': host,
'user': user,
'password': password,
'database': database,
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor
}
@contextmanager
def _get_connection(self):
conn = pymysql.connect(**self.config)
try: # try/except捕获异常
yield conn # yield生成器:惰性产出值,节省内存
finally:
conn.close()
def _hash_password(self, password: str, salt: str = None) -> tuple:
"""密码哈希"""
if salt is None:
salt = secrets.token_hex(16)
hash_value = hashlib.pbkdf2_hmac('sha256', password.encode(), salt.encode(), 100000)
return hash_value.hex(), salt
def _verify_password(self, password: str, hash_value: str, salt: str) -> bool:
"""验证密码"""
computed_hash, _ = self._hash_password(password, salt)
return computed_hash == hash_value
def register(self, username: str, password: str, email: str = None, phone: str = None) -> dict:
"""用户注册"""
password_hash, salt = self._hash_password(password)
full_hash = f"{salt}${password_hash}"
with self._get_connection() as conn:
with conn.cursor() as cursor:
try:
cursor.execute(
"INSERT INTO users (username, password_hash, email, phone) VALUES (%s, %s, %s, %s)",
(username, full_hash, email, phone)
)
user_id = cursor.lastrowid
# 分配默认角色
cursor.execute(
"INSERT INTO user_roles (user_id, role_id) VALUES (%s, (SELECT role_id FROM roles WHERE role_name = 'user'))",
(user_id,)
)
conn.commit()
self._log_operation(user_id, 'REGISTER', 'user', str(user_id), {'username': username})
return {'success': True, 'user_id': user_id, 'message': '注册成功'}
except pymysql.err.IntegrityError as e:
if 'username' in str(e):
return {'success': False, 'message': '用户名已存在'}
elif 'email' in str(e):
return {'success': False, 'message': '邮箱已被使用'}
raise
def login(self, username: str, password: str, ip_address: str = None) -> dict:
"""用户登录"""
with self._get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(
"SELECT user_id, password_hash, status FROM users WHERE username = %s",
(username,)
)
user = cursor.fetchone()
if not user:
return {'success': False, 'message': '用户名或密码错误'}
if user['status'] == 'banned':
return {'success': False, 'message': '账号已被禁用'}
# 验证密码
salt, hash_value = user['password_hash'].split('$')
if not self._verify_password(password, hash_value, salt):
self._log_operation(user['user_id'], 'LOGIN_FAILED', 'user', str(user['user_id']),
{'username': username, 'ip': ip_address})
return {'success': False, 'message': '用户名或密码错误'}
# 获取用户角色
cursor.execute("""
SELECT r.role_name FROM roles r
JOIN user_roles ur ON r.role_id = ur.role_id
WHERE ur.user_id = %s
""", (user['user_id'],))
roles = [row['role_name'] for row in cursor.fetchall()]
self._log_operation(user['user_id'], 'LOGIN', 'user', str(user['user_id']),
{'username': username, 'ip': ip_address})
return {
'success': True,
'user_id': user['user_id'],
'username': username,
'roles': roles,
'message': '登录成功'
}
def get_user(self, user_id: int) -> dict:
"""获取用户信息"""
with self._get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(
"SELECT user_id, username, email, phone, status, created_at FROM users WHERE user_id = %s",
(user_id,)
)
return cursor.fetchone()
def update_user(self, user_id: int, **kwargs) -> dict: # *args接收任意位置参数;**kwargs接收任意关键字参数
"""更新用户信息"""
allowed_fields = ['email', 'phone', 'status']
updates = {k: v for k, v in kwargs.items() if k in allowed_fields}
if not updates:
return {'success': False, 'message': '没有可更新的字段'}
set_clause = ', '.join([f"{k} = %s" for k in updates.keys()])
values = list(updates.values()) + [user_id]
with self._get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(f"UPDATE users SET {set_clause} WHERE user_id = %s", values)
conn.commit()
self._log_operation(user_id, 'UPDATE_USER', 'user', str(user_id), updates)
return {'success': True, 'message': '更新成功'}
def delete_user(self, user_id: int) -> dict:
"""删除用户"""
with self._get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute("DELETE FROM users WHERE user_id = %s", (user_id,))
conn.commit()
self._log_operation(user_id, 'DELETE_USER', 'user', str(user_id), {})
return {'success': True, 'message': '删除成功'}
def list_users(self, page: int = 1, page_size: int = 10, status: str = None) -> dict:
"""用户列表"""
with self._get_connection() as conn:
with conn.cursor() as cursor:
where_clause = "WHERE status = %s" if status else ""
params = [status] if status else []
# 查询总数
cursor.execute(f"SELECT COUNT(*) as total FROM users {where_clause}", params)
total = cursor.fetchone()['total']
# 查询数据
offset = (page - 1) * page_size
cursor.execute(f"""
SELECT user_id, username, email, phone, status, created_at
FROM users {where_clause}
ORDER BY created_at DESC
LIMIT %s OFFSET %s
""", params + [page_size, offset])
users = cursor.fetchall()
return {
'success': True,
'total': total,
'page': page,
'page_size': page_size,
'users': users
}
def _log_operation(self, user_id: int, operation: str, target_type: str,
target_id: str, details: dict, ip_address: str = None):
"""记录操作日志"""
with self._get_connection() as conn:
with conn.cursor() as cursor:
cursor.execute(
"INSERT INTO operation_logs (user_id, operation, target_type, target_id, details, ip_address) VALUES (%s, %s, %s, %s, %s, %s)",
(user_id, operation, target_type, target_id, json.dumps(details), ip_address) # json.dumps将Python对象转为JSON字符串
)
conn.commit()
# 使用示例
if __name__ == '__main__':
um = UserManager(password='your_password')
# 注册
result = um.register('testuser', 'password123', 'test@example.com')
print(result)
# 登录
result = um.login('testuser', 'password123')
print(result)
# 获取用户列表
result = um.list_users(page=1, page_size=5)
print(result)
🎯 本章自测¶
安装配置¶
-
在你的电脑上安装MySQL,并创建一个新的数据库
-
配置MySQL允许远程连接(注意安全)
SQL操作¶
- 创建一个电商数据库,包含以下表:
- 商品表(products):id, name, price, stock, category_id
- 分类表(categories):id, name
- 订单表(orders):id, user_id, total_amount, status, created_at
-
订单项表(order_items):id, order_id, product_id, quantity, price
-
插入测试数据,并编写以下查询:
- 查询每个分类的商品数量
- 查询销量最高的前10个商品
- 查询某用户的订单历史
Python集成¶
- 使用Python连接MySQL,实现以下功能:
- 商品CRUD操作
- 订单创建和查询
- 库存管理(下单时扣减库存)
备份恢复¶
- 编写一个备份脚本,每天自动备份数据库
📚 扩展阅读¶
推荐资源¶
下一步¶
完成本章后,继续学习 第05章:PostgreSQL进阶,了解功能更强大的PostgreSQL!
本章完 | 预计学习时间:4-5小时