跳转至

数据库集成

📖 章节简介

本章将介绍Flask-SQLAlchemy扩展,学习如何在Flask应用中集成数据库,使用ORM进行数据操作。

🗄️ SQLAlchemy基础

1. 安装和配置

Bash
# 安装Flask-SQLAlchemy
pip install flask-sqlalchemy

# 安装Flask-Migrate(数据库迁移)
pip install flask-migrate

# requirements.txt
flask-sqlalchemy==3.1.1
flask-migrate==4.0.5
Python
# config.py
import os

class Config:
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL') or \
        'sqlite:///' + os.path.join(os.path.dirname(os.path.abspath(__file__)), 'app.db')
    SQLALCHEMY_TRACK_MODIFICATIONS = False
Python
# app/__init__.py
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate

db = SQLAlchemy()
migrate = Migrate()

def create_app():
    app = Flask(__name__)
    app.config.from_object('config.Config')

    # 初始化扩展
    db.init_app(app)
    migrate.init_app(app, db)

    return app

2. 创建模型

Python
# app/models.py
from datetime import datetime, timezone
from werkzeug.security import generate_password_hash, check_password_hash
from app import db

class User(db.Model):
    """用户模型"""
    __tablename__ = 'users'

    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(20), unique=True, nullable=False, index=True)
    email = db.Column(db.String(120), unique=True, nullable=False, index=True)
    password_hash = db.Column(db.String(128))
    created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))  # lambda匿名函数:简洁的单行函数
    is_admin = db.Column(db.Boolean, default=False)

    # 关系
    posts = db.relationship('Post', backref='author', lazy='dynamic')
    comments = db.relationship('Comment', backref='author', lazy='dynamic')

    def set_password(self, password):
        """设置密码"""
        self.password_hash = generate_password_hash(password)

    def check_password(self, password):
        """验证密码"""
        return check_password_hash(self.password_hash, password)

    def __repr__(self):
        return f'<User {self.username}>'

class Post(db.Model):
    """文章模型"""
    __tablename__ = 'posts'

    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(100), nullable=False)
    content = db.Column(db.Text, nullable=False)
    created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc), index=True)
    updated_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))

    # 外键
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
    category_id = db.Column(db.Integer, db.ForeignKey('categories.id'))

    # 关系
    comments = db.relationship('Comment', backref='post', lazy='dynamic', cascade='all, delete-orphan')

    def __repr__(self):
        return f'<Post {self.title}>'

class Category(db.Model):
    """分类模型"""
    __tablename__ = 'categories'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(50), unique=True, nullable=False)
    description = db.Column(db.Text)

    # 关系
    posts = db.relationship('Post', backref='category', lazy='dynamic')

    def __repr__(self):
        return f'<Category {self.name}>'

class Comment(db.Model):
    """评论模型"""
    __tablename__ = 'comments'

    id = db.Column(db.Integer, primary_key=True)
    content = db.Column(db.Text, nullable=False)
    created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))

    # 外键
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'))
    post_id = db.Column(db.Integer, db.ForeignKey('posts.id'))

    def __repr__(self):
        return f'<Comment {self.id}>'

🔍 数据库操作

💡 SQLAlchemy 2.0 风格说明:Flask-SQLAlchemy 3.x 仍支持传统的 Model.query 接口,但 SQLAlchemy 2.0 推荐使用 db.session.execute(db.select(...)) 风格。下方示例同时展示两种写法,新项目建议使用 2.0 风格。

1. 基本CRUD操作

Python
# 创建(Create)
def create_user(username, email, password):
    user = User(username=username, email=email)
    user.set_password(password)
    db.session.add(user)
    db.session.commit()
    return user

# 读取(Read)
def get_user(user_id):
    user = db.session.get(User, user_id)
    return user

def get_user_by_username(username):
    # 传统写法(Flask-SQLAlchemy 3.x 仍支持):
    # user = User.query.filter_by(username=username).first()
    # SQLAlchemy 2.0 推荐写法:
    user = db.session.execute(
        db.select(User).filter_by(username=username)
    ).scalar_one_or_none()
    return user

def get_all_users():
    # 传统写法: User.query.all()
    # SQLAlchemy 2.0 推荐写法:
    users = db.session.execute(db.select(User)).scalars().all()
    return users

# 更新(Update)
def update_user(user_id, **kwargs):  # *args接收任意位置参数;**kwargs接收任意关键字参数
    user = db.session.get(User, user_id)
    if user:
        for key, value in kwargs.items():
            if hasattr(user, key):  # hasattr/getattr/setattr动态操作对象属性
                setattr(user, key, value)
        db.session.commit()
    return user

# 删除(Delete)
def delete_user(user_id):
    user = db.session.get(User, user_id)
    if user:
        db.session.delete(user)
        db.session.commit()
    return user

2. 高级查询

Python
# 复杂查询示例
def get_recent_posts(limit=10):
    """获取最新文章(SQLAlchemy 2.0 风格)"""
    posts = db.session.execute(
        db.select(Post).order_by(Post.created_at.desc()).limit(limit)
    ).scalars().all()
    return posts

def search_posts(keyword):
    """搜索文章(SQLAlchemy 2.0 风格)"""
    posts = db.session.execute(
        db.select(Post).filter(
            Post.title.contains(keyword) | Post.content.contains(keyword)
        )
    ).scalars().all()
    return posts

def get_user_posts(user_id, page=1, per_page=10):
    """分页获取用户文章(SQLAlchemy 2.0 风格)"""
    pagination = db.paginate(
        db.select(Post).filter_by(user_id=user_id)
          .order_by(Post.created_at.desc()),
        page=page, per_page=per_page, error_out=False
    )
    return pagination

def get_category_posts(category_id, page=1, per_page=10):
    """分页获取分类文章(SQLAlchemy 2.0 风格)"""
    pagination = db.paginate(
        db.select(Post).filter_by(category_id=category_id)
          .order_by(Post.created_at.desc()),
        page=page, per_page=per_page, error_out=False
    )
    return pagination

def get_popular_posts(limit=10):
    """获取热门文章(按评论数,SQLAlchemy 2.0 风格)"""
    posts = db.session.execute(
        db.select(Post).join(Comment)
          .group_by(Post.id)
          .order_by(db.func.count(Comment.id).desc())
          .limit(limit)
    ).scalars().all()
    return posts

🔄 数据库迁移

1. 初始化迁移

Bash
# 初始化迁移仓库
flask db init

# 生成迁移脚本
flask db migrate -m "Initial migration"

# 应用迁移
flask db upgrade

# 回滚迁移
flask db downgrade

2. 创建迁移

Bash
# 修改模型后创建迁移
flask db migrate -m "Add user bio field"

# 查看迁移历史
flask db history

# 查看当前版本
flask db current

# 查看迁移状态
flask db show

💡 最佳实践

1. 数据库设计

Python
# 数据库设计最佳实践
database_design = {
    '规范化': '遵循数据库规范化原则',
    '索引': '为常用查询字段创建索引',
    '关系': '正确定义表之间的关系',
    '约束': '使用约束保证数据完整性',
    '事务': '使用事务保证数据一致性'
}

2. 性能优化

Python
# 数据库性能优化
performance_optimization = {
    '批量操作': '使用批量插入和更新',
    '延迟加载': '使用lazy加载减少查询',
    '缓存': '使用缓存减少数据库访问',
    '连接池': '配置数据库连接池',
    '查询优化': '优化SQL查询'
}

📝 练习题

基础题

  1. 什么是ORM?
  2. 如何定义数据模型?
  3. 如何进行基本的CRUD操作?

进阶题

  1. 实现复杂查询。
  2. 使用数据库迁移。
  3. 优化数据库性能。

实践题

  1. 创建用户和文章模型。
  2. 实现文章评论功能。
  3. 构建完整的数据库操作层。

📚 推荐阅读

🔗 下一章

用户认证 - 学习Flask-Login的使用。