目錄
- SQLAlchemy 介紹
- 環境準備與安裝
- 數據庫連接
- 數據模型定義
- 基本數據操作
- 復雜查詢操作
- 高級特性
- 實戰項目示例
- 性能優化與最佳實踐
- 常見問題與解決方案
1. SQLAlchemy 介紹
1.1 什么是SQLAlchemy
SQLAlchemy 是一個用于 Python 的 SQL 工具和對象關系映射(ORM)庫。它允許開發者通過 Python 代碼來與關系型數據庫交互,而不必直接編寫SQL語句。
對象關系映射(ORM) 是一種程序設計技術,用于實現面向對象編程語言里不同類型系統的數據之間的轉換。簡單來說,就是將數據庫表映射為Python類,將表中的記錄映射為類的實例。
1.2 SQLAlchemy 架構
SQLAlchemy 采用分層架構設計:
┌─────────────────────────────────────┐
│ ORM 層 │
│ (對象關系映射 - 高級抽象) │
├─────────────────────────────────────┤
│ Core 層 │
│ (SQL表達式語言 - 底層抽象) │
├─────────────────────────────────────┤
│ Engine 層 │
│ (數據庫引擎 - 連接管理) │
├─────────────────────────────────────┤
│ DBAPI 層 │
│ (數據庫驅動 - 具體實現) │
└─────────────────────────────────────┘
1.3 主要應用場景
SQLAlchemy 主要應用于以下場景:
- 數據庫訪問和操作:提供高層抽象來操作數據庫,避免編寫原生SQL語句
- ORM映射:建立Python類與數據庫表的映射關系,簡化數據模型操作
- 復雜查詢:提供豐富的查詢方式,如過濾、分組、聯結等
- 異步查詢:基于Greenlet等實現異步查詢,提高查詢效率
- 事務控制:通過Session管理數據庫會話和事務
- 多數據庫支持:支持PostgreSQL、MySQL、Oracle、SQLite等主流數據庫
- Web框架集成:與Flask、FastAPI等框架無縫集成
2. 環境準備與安裝
2.1 安裝SQLAlchemy
# 安裝SQLAlchemy核心庫
pip install sqlalchemy# 安裝數據庫驅動(根據需要選擇)
pip install pymysql # MySQL驅動
pip install psycopg2-binary # PostgreSQL驅動
pip install cx_Oracle # Oracle驅動
# SQLite驅動已內置在Python標準庫中
2.2 數據庫依賴對照表
數據庫類型 | 依賴庫 | 連接字符串示例 |
---|---|---|
關系型數據庫 | ||
MySQL | pymysql | mysql+pymysql://username:password@localhost:3306/database_name |
PostgreSQL | psycopg2 | postgresql://username:password@localhost:5432/database_name |
SQLite | 內置 | sqlite:///example.db |
Oracle | cx_Oracle | oracle://username:password@localhost:1521/orcl |
NoSQL數據庫 | ||
MongoDB | pymongo | mongodb://username:password@localhost:27017/database_name |
Redis | redis | redis://localhost:6379/0 |
2.3 驗證安裝
# 驗證SQLAlchemy安裝
import sqlalchemy
print(f"SQLAlchemy版本: {sqlalchemy.__version__}")# 測試數據庫連接
from sqlalchemy import create_engine# 創建內存SQLite數據庫進行測試
engine = create_engine('sqlite:///:memory:', echo=True)
print("數據庫連接測試成功!")
3. 數據庫連接
3.1 創建數據庫引擎
數據庫引擎是SQLAlchemy的核心組件,負責管理數據庫連接。
from sqlalchemy import create_engine# SQLite連接(文件數據庫)
engine = create_engine('sqlite:///example.db', echo=True)# MySQL連接
dbHost = 'mysql+pymysql://root:password@127.0.0.1:3306/test'
engine = create_engine(dbHost,echo=True, # 是否打印SQL語句pool_size=10, # 連接池大小max_overflow=20, # 超出連接池大小的連接數pool_pre_ping=True, # 連接前檢查連接有效性pool_recycle=3600 # 連接回收時間(秒)
)# PostgreSQL連接
engine = create_engine('postgresql://username:password@localhost:5432/database',echo=False,pool_size=5,max_overflow=10
)
3.2 引擎參數詳解
參數 | 說明 | 默認值 |
---|---|---|
echo | 是否打印執行的SQL語句 | False |
pool_size | 連接池保持的連接數 | 5 |
max_overflow | 允許超過pool_size的最大連接數 | 10 |
pool_timeout | 獲取連接的超時時間(秒) | 30 |
pool_recycle | 連接在連接池中保持的最長時間(秒) | -1 |
pool_pre_ping | 連接前檢查連接是否有效 | False |
3.3 連接池管理
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool# 自定義連接池配置
engine = create_engine('mysql+pymysql://user:password@localhost/dbname',poolclass=QueuePool,pool_size=20, # 連接池大小max_overflow=30, # 最大溢出連接數pool_timeout=60, # 獲取連接超時時間pool_recycle=7200, # 連接回收時間(2小時)echo=True
)# 獲取連接池狀態信息
print(f"連接池大小: {engine.pool.size()}")
print(f"已檢出連接數: {engine.pool.checkedout()}")
print(f"溢出連接數: {engine.pool.overflow()}")
4. 數據模型定義
4.1 聲明式基類
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float, DateTime, Boolean, Text
from datetime import datetime# 創建聲明式基類
Base = declarative_base()
4.2 基礎模型定義
class User(Base):"""用戶模型類"""__tablename__ = 'users' # 指定表名__table_args__ = {'comment': '用戶信息表'} # 表注釋# 定義字段id = Column(Integer, primary_key=True, autoincrement=True, comment='用戶ID')username = Column(String(50), nullable=False, unique=True, comment='用戶名')email = Column(String(100), nullable=False, index=True, comment='郵箱')password_hash = Column(String(128), nullable=False, comment='密碼哈希')full_name = Column(String(100), comment='全名')is_active = Column(Boolean, default=True, comment='是否激活')created_at = Column(DateTime, default=datetime.now, comment='創建時間')updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now, comment='更新時間')def __repr__(self):return f"<User(id={self.id}, username='{self.username}')>"def __str__(self):return f"用戶: {self.username} ({self.email})"
4.3 Column常用參數說明
參數 | 說明 | 示例 |
---|---|---|
primary_key | 是否為主鍵 | primary_key=True |
nullable | 是否允許為空 | nullable=False |
unique | 是否唯一 | unique=True |
index | 是否創建索引 | index=True |
default | 默認值 | default=0 |
onupdate | 更新時的默認值 | onupdate=datetime.now |
autoincrement | 是否自增 | autoincrement=True |
comment | 字段注釋 | comment='用戶ID' |
4.4 復雜數據類型示例
from sqlalchemy import JSON, DECIMAL, Enum
from sqlalchemy.dialects.mysql import LONGTEXT
import enumclass UserStatus(enum.Enum):"""用戶狀態枚舉"""ACTIVE = "active"INACTIVE = "inactive"SUSPENDED = "suspended"class Product(Base):"""商品模型類"""__tablename__ = 'products'id = Column(Integer, primary_key=True)name = Column(String(200), nullable=False, comment='商品名稱')description = Column(Text, comment='商品描述')price = Column(DECIMAL(10, 2), nullable=False, comment='價格')stock = Column(Integer, default=0, comment='庫存數量')# JSON字段存儲額外屬性attributes = Column(JSON, comment='商品屬性')# 枚舉字段status = Column(Enum(UserStatus), default=UserStatus.ACTIVE, comment='狀態')# 長文本字段content = Column(LONGTEXT, comment='詳細內容')created_at = Column(DateTime, default=datetime.now)updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
4.5 表關系定義
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationshipclass Category(Base):"""商品分類模型"""__tablename__ = 'categories'id = Column(Integer, primary_key=True)name = Column(String(100), nullable=False, comment='分類名稱')description = Column(Text, comment='分類描述')# 一對多關系:一個分類有多個商品products = relationship("Product", back_populates="category")class Product(Base):"""商品模型(帶關系)"""__tablename__ = 'products'id = Column(Integer, primary_key=True)name = Column(String(200), nullable=False)category_id = Column(Integer, ForeignKey('categories.id'), comment='分類ID')# 多對一關系:多個商品屬于一個分類category = relationship("Category", back_populates="products")# 多對多關系示例
from sqlalchemy import Table# 中間表定義
user_role_association = Table('user_roles',Base.metadata,Column('user_id', Integer, ForeignKey('users.id')),Column('role_id', Integer, ForeignKey('roles.id'))
)class Role(Base):"""角色模型"""__tablename__ = 'roles'id = Column(Integer, primary_key=True)name = Column(String(50), nullable=False, unique=True)description = Column(String(200))# 多對多關系:角色和用戶users = relationship("User", secondary=user_role_association, back_populates="roles")# 更新User模型,添加角色關系
User.roles = relationship("Role", secondary=user_role_association, back_populates="users")
5. 基本數據操作
5.1 創建表結構
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base# 創建引擎
engine = create_engine('sqlite:///example.db', echo=True)# 創建所有表
Base.metadata.create_all(engine)# 刪除所有表(謹慎使用)
# Base.metadata.drop_all(engine)# 創建特定表
# User.__table__.create(engine, checkfirst=True)# 刪除特定表
# User.__table__.drop(engine, checkfirst=True)
5.2 會話管理
from sqlalchemy.orm import sessionmaker, scoped_session# 創建會話類
Session = sessionmaker(bind=engine)# 方式1:手動管理會話
session = Session()
try:# 數據庫操作user = User(username='john', email='john@example.com')session.add(user)session.commit()
except Exception as e:session.rollback()print(f"操作失敗: {e}")
finally:session.close()# 方式2:使用上下文管理器(推薦)
with Session() as session:user = User(username='jane', email='jane@example.com')session.add(user)session.commit()# 自動關閉會話# 方式3:使用scoped_session(線程安全)
SessionLocal = scoped_session(sessionmaker(bind=engine))def get_session():"""獲取數據庫會話"""return SessionLocal()def close_session():"""關閉會話"""SessionLocal.remove()
5.3 插入數據
# 單條數據插入
with Session() as session:# 創建用戶對象new_user = User(username='alice',email='alice@example.com',full_name='Alice Smith',password_hash='hashed_password_here')# 添加到會話session.add(new_user)# 提交事務session.commit()# 獲取插入后的IDprint(f"新用戶ID: {new_user.id}")# 批量插入
with Session() as session:users = [User(username='bob', email='bob@example.com', full_name='Bob Johnson'),User(username='charlie', email='charlie@example.com', full_name='Charlie Brown'),User(username='diana', email='diana@example.com', full_name='Diana Prince')]# 批量添加session.add_all(users)session.commit()print(f"批量插入了 {len(users)} 個用戶")# 使用bulk_insert_mappings(高性能批量插入)
with Session() as session:user_data = [{'username': 'user1', 'email': 'user1@example.com', 'full_name': 'User One'},{'username': 'user2', 'email': 'user2@example.com', 'full_name': 'User Two'},{'username': 'user3', 'email': 'user3@example.com', 'full_name': 'User Three'}]session.bulk_insert_mappings(User, user_data)session.commit()
5.4 查詢數據
with Session() as session:# 查詢所有用戶all_users = session.query(User).all()print(f"總用戶數: {len(all_users)}")# 查詢第一個用戶first_user = session.query(User).first()print(f"第一個用戶: {first_user}")# 根據ID查詢user_by_id = session.query(User).get(1) # 主鍵查詢if user_by_id:print(f"ID為1的用戶: {user_by_id.username}")# 條件查詢active_users = session.query(User).filter(User.is_active == True).all()print(f"活躍用戶數: {len(active_users)}")# 單條記錄查詢(確保唯一)try:unique_user = session.query(User).filter(User.username == 'alice').one()print(f"找到用戶: {unique_user.email}")except Exception as e:print(f"查詢失敗: {e}")# 可能為空的單條查詢maybe_user = session.query(User).filter(User.username == 'nonexistent').one_or_none()if maybe_user:print(f"找到用戶: {maybe_user.username}")else:print("用戶不存在")
5.5 更新數據
with Session() as session:# 方式1:查詢后更新user = session.query(User).filter(User.username == 'alice').first()if user:user.full_name = 'Alice Johnson' # 修改屬性user.updated_at = datetime.now() # 更新時間session.commit()print(f"用戶 {user.username} 信息已更新")# 方式2:批量更新updated_count = session.query(User).filter(User.is_active == True).update({User.updated_at: datetime.now()})session.commit()print(f"批量更新了 {updated_count} 個用戶")# 方式3:條件更新session.query(User).filter(User.created_at < datetime(2023, 1, 1)).update({User.is_active: False,User.updated_at: datetime.now()})session.commit()
5.6 刪除數據
with Session() as session:# 方式1:查詢后刪除user_to_delete = session.query(User).filter(User.username == 'bob').first()if user_to_delete:session.delete(user_to_delete)session.commit()print(f"用戶 {user_to_delete.username} 已刪除")# 方式2:批量刪除deleted_count = session.query(User).filter(User.is_active == False).delete()session.commit()print(f"批量刪除了 {deleted_count} 個用戶")# 方式3:條件刪除session.query(User).filter(User.created_at < datetime(2022, 1, 1)).delete()session.commit()
6. 復雜查詢操作
6.1 條件查詢
from sqlalchemy import and_, or_, not_, func
from sqlalchemy.sql import textwith Session() as session:# 基本條件查詢users = session.query(User).filter(User.is_active == True).all()# 多條件查詢(AND)users = session.query(User).filter(and_(User.is_active == True,User.created_at > datetime(2023, 1, 1))).all()# 或條件查詢(OR)users = session.query(User).filter(or_(User.username.like('%admin%'),User.email.like('%@admin.com'))).all()# 非條件查詢(NOT)users = session.query(User).filter(not_(User.is_active == False)).all()# IN查詢user_ids = [1, 2, 3, 4, 5]users = session.query(User).filter(User.id.in_(user_ids)).all()# NOT IN查詢users = session.query(User).filter(~User.id.in_(user_ids)).all()# LIKE模糊查詢users = session.query(User).filter(User.username.like('%john%')).all()# ILIKE不區分大小寫查詢(PostgreSQL)users = session.query(User).filter(User.username.ilike('%JOHN%')).all()# BETWEEN范圍查詢users = session.query(User).filter(User.created_at.between(datetime(2023, 1, 1), datetime(2023, 12, 31))).all()# IS NULL查詢users = session.query(User).filter(User.full_name.is_(None)).all()# IS NOT NULL查詢users = session.query(User).filter(User.full_name.isnot(None)).all()# 原生SQL條件users = session.query(User).filter(text("username LIKE :pattern")).params(pattern='%admin%').all()
6.2 排序和分頁
with Session() as session:# 升序排序users = session.query(User).order_by(User.created_at).all()# 降序排序users = session.query(User).order_by(User.created_at.desc()).all()# 多字段排序users = session.query(User).order_by(User.is_active.desc(), # 先按活躍狀態降序User.created_at.asc() # 再按創建時間升序).all()# 限制結果數量recent_users = session.query(User).order_by(User.created_at.desc()).limit(10).all()# 跳過指定數量的記錄users = session.query(User).offset(20).limit(10).all()# 分頁查詢page = 1per_page = 10users = session.query(User).offset((page - 1) * per_page).limit(per_page).all()# 使用slice進行分頁(更Pythonic)users = session.query(User)[20:30] # 獲取第21-30條記錄
6.3 聚合查詢
from sqlalchemy import func, distinctwith Session() as session:# 計數查詢total_users = session.query(func.count(User.id)).scalar()print(f"總用戶數: {total_users}")# 去重計數unique_emails = session.query(func.count(distinct(User.email))).scalar()print(f"唯一郵箱數: {unique_emails}")# 最大值、最小值latest_user = session.query(func.max(User.created_at)).scalar()earliest_user = session.query(func.min(User.created_at)).scalar()# 平均值(假設User有age字段)# avg_age = session.query(func.avg(User.age)).scalar()# 求和# total_age = session.query(func.sum(User.age)).scalar()# 分組查詢user_counts_by_status = session.query(User.is_active,func.count(User.id).label('count')).group_by(User.is_active).all()for is_active, count in user_counts_by_status:status = "活躍" if is_active else "非活躍"print(f"{status}用戶數: {count}")# HAVING子句(分組后篩選)active_email_domains = session.query(func.substr(User.email, func.instr(User.email, '@') + 1).label('domain'),func.count(User.id).label('count')).filter(User.is_active == True).group_by(func.substr(User.email, func.instr(User.email, '@') + 1)).having(func.count(User.id) > 1 # 只顯示用戶數大于1的域名).all()
6.4 連接查詢(JOIN)
with Session() as session:# 內連接(INNER JOIN)results = session.query(User, Product).join(Product, User.id == Product.user_id).all()# 左外連接(LEFT OUTER JOIN)results = session.query(User, Product).outerjoin(Product, User.id == Product.user_id).all()# 使用relationship進行連接results = session.query(User).join(User.products).all()# 連接查詢with條件results = session.query(User, Product).join(Product).filter(Product.price > 100).all()# 多表連接results = session.query(User, Product, Category).join(Product).join(Category).filter(Category.name == 'Electronics').all()# 自連接(查找同一表中的關聯數據)# 假設User表有manager_id字段manager_alias = aliased(User)results = session.query(User, manager_alias).join(manager_alias, User.manager_id == manager_alias.id).all()
6.5 子查詢
from sqlalchemy.orm import aliasedwith Session() as session:# 標量子查詢avg_price_subquery = session.query(func.avg(Product.price)).scalar_subquery()expensive_products = session.query(Product).filter(Product.price > avg_price_subquery).all()# 表子查詢user_product_count = session.query(Product.user_id,func.count(Product.id).label('product_count')).group_by(Product.user_id).subquery()# 使用子查詢結果productive_users = session.query(User).join(user_product_count, User.id == user_product_count.c.user_id).filter(user_product_count.c.product_count > 5).all()# EXISTS子查詢from sqlalchemy.sql import existsusers_with_products = session.query(User).filter(exists().where(Product.user_id == User.id)).all()# NOT EXISTS子查詢users_without_products = session.query(User).filter(~exists().where(Product.user_id == User.id)).all()# IN子查詢active_user_ids = session.query(User.id).filter(User.is_active == True).subquery()products_by_active_users = session.query(Product).filter(Product.user_id.in_(active_user_ids)).all()
6.6 窗口函數(高級查詢)
from sqlalchemy import funcwith Session() as session:# ROW_NUMBER() 窗口函數results = session.query(User.id,User.username,func.row_number().over(order_by=User.created_at.desc()).label('row_num')).all()# RANK() 窗口函數results = session.query(Product.id,Product.name,Product.price,func.rank().over(order_by=Product.price.desc()).label('price_rank')).all()# 分區窗口函數results = session.query(Product.id,Product.name,Product.category_id,Product.price,func.row_number().over(partition_by=Product.category_id,order_by=Product.price.desc()).label('category_rank')).all()# LAG/LEAD 窗口函數(獲取前一行/后一行數據)results = session.query(User.id,User.username,User.created_at,func.lag(User.created_at, 1).over(order_by=User.created_at).label('prev_created_at')).all()
7. 高級特性
7.1 事務管理
from sqlalchemy.exc import SQLAlchemyError# 基本事務管理
with Session() as session:try:# 開始事務(自動開始)user1 = User(username='user1', email='user1@example.com')user2 = User(username='user2', email='user2@example.com')session.add(user1)session.add(user2)# 提交事務session.commit()print("事務提交成功")except SQLAlchemyError as e:# 回滾事務session.rollback()print(f"事務回滾: {e}")raise# 嵌套事務(保存點)
with Session() as session:try:user = User(username='main_user', email='main@example.com')session.add(user)# 創建保存點savepoint = session.begin_nested()try:# 可能失敗的操作risky_user = User(username='risky', email='invalid_email')session.add(risky_user)session.flush() # 強制執行SQL但不提交# 如果成功,提交保存點savepoint.commit()except Exception as e:# 回滾到保存點savepoint.rollback()print(f"保存點回滾: {e}")# 主事務提交session.commit()except Exception as e:session.rollback()print(f"主事務回滾: {e}")
7.2 懶加載與預加載
from sqlalchemy.orm import joinedload, selectinload, subqueryload# 懶加載(默認行為)
with Session() as session:user = session.query(User).first()# 訪問關聯對象時才執行查詢products = user.products # 這里會觸發額外的SQL查詢# 預加載 - joinedload(使用JOIN)
with Session() as session:users = session.query(User).options(joinedload(User.products)).all()# 一次查詢獲取用戶和產品數據for user in users:print(f"用戶 {user.username} 有 {len(user.products)} 個產品")# 預加載 - selectinload(使用IN查詢)
with Session() as session:users = session.query(User).options(selectinload(User.products)).all()# 兩次查詢:先查用戶,再用IN查詢產品# 預加載 - subqueryload(使用子查詢)
with Session() as session:users = session.query(User).options(subqueryload(User.products)).all()# 使用子查詢預加載關聯數據# 多層預加載
with Session() as session:users = session.query(User).options(joinedload(User.products).joinedload(Product.category)).all()# 一次查詢獲取用戶、產品和分類數據# 選擇性預加載
with Session() as session:users = session.query(User).options(selectinload(User.products).selectinload(Product.reviews)).filter(User.is_active == True).all()
7.3 緩存機制
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session# 一級緩存(Session級別)
with Session() as session:# 第一次查詢,從數據庫獲取user1 = session.query(User).get(1)# 第二次查詢,從Session緩存獲取user2 = session.query(User).get(1)# 兩個對象是同一個實例assert user1 is user2print("Session緩存生效")# 二級緩存(需要額外配置)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session
from sqlalchemy import event# 簡單的內存緩存實現
class SimpleCache:def __init__(self):self._cache = {}def get(self, key):return self._cache.get(key)def set(self, key, value):self._cache[key] = valuedef delete(self, key):self._cache.pop(key, None)cache = SimpleCache()# 查詢緩存裝飾器
def cached_query(cache_key):def decorator(func):def wrapper(*args, **kwargs):# 嘗試從緩存獲取result = cache.get(cache_key)if result is not None:print(f"從緩存獲取: {cache_key}")return result# 執行查詢result = func(*args, **kwargs)# 存入緩存cache.set(cache_key, result)print(f"存入緩存: {cache_key}")return resultreturn wrapperreturn decorator@cached_query('all_active_users')
def get_active_users(session):return session.query(User).filter(User.is_active == True).all()
7.4 數據庫遷移
# 使用Alembic進行數據庫遷移
# 首先安裝: pip install alembic# 初始化遷移環境
# alembic init migrations# 創建遷移腳本
# alembic revision --autogenerate -m "創建用戶表"# 執行遷移
# alembic upgrade head# 回滾遷移
# alembic downgrade -1# 編程方式創建遷移
from alembic import command
from alembic.config import Configdef run_migrations():"""運行數據庫遷移"""alembic_cfg = Config("alembic.ini")command.upgrade(alembic_cfg, "head")def create_migration(message):"""創建新的遷移文件"""alembic_cfg = Config("alembic.ini")command.revision(alembic_cfg, autogenerate=True, message=message)
7.5 異步支持
# SQLAlchemy 1.4+ 異步支持
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
import asyncio# 創建異步引擎
async_engine = create_async_engine("postgresql+asyncpg://user:password@localhost/dbname",echo=True
)# 創建異步會話
AsyncSessionLocal = async_sessionmaker(async_engine,class_=AsyncSession,expire_on_commit=False
)# 異步數據操作
async def create_user_async(username: str, email: str):"""異步創建用戶"""async with AsyncSessionLocal() as session:user = User(username=username, email=email)session.add(user)await session.commit()return userasync def get_users_async():"""異步獲取用戶列表"""async with AsyncSessionLocal() as session:result = await session.execute(select(User).filter(User.is_active == True))return result.scalars().all()# 運行異步函數
async def main():# 創建表async with async_engine.begin() as conn:await conn.run_sync(Base.metadata.create_all)# 創建用戶user = await create_user_async("async_user", "async@example.com")print(f"創建用戶: {user.username}")# 查詢用戶users = await get_users_async()print(f"查詢到 {len(users)} 個用戶")# 運行異步代碼
# asyncio.run(main())
8. 實戰項目示例
8.1 項目結構
blog_project/
├── main.py # 應用入口
├── database.py # 數據庫配置
├── models.py # 數據模型
├── schemas.py # 數據驗證模式
├── crud.py # 數據庫操作
├── api.py # API接口
└── requirements.txt # 依賴包
8.2 數據庫配置 (database.py)
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import os# 數據庫URL配置
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./blog.db"
)# 創建數據庫引擎
engine = create_engine(DATABASE_URL,echo=True, # 開發環境顯示SQLpool_pre_ping=True,pool_recycle=3600
)# 創建會話工廠
SessionLocal = sessionmaker(autocommit=False,autoflush=False,bind=engine
)# 創建基類
Base = declarative_base()# 依賴注入:獲取數據庫會話
def get_db():"""獲取數據庫會話"""db = SessionLocal()try:yield dbfinally:db.close()# 創建所有表
def create_tables():"""創建數據庫表"""Base.metadata.create_all(bind=engine)# 刪除所有表
def drop_tables():"""刪除數據庫表"""Base.metadata.drop_all(bind=engine)
8.3 數據模型 (models.py)
from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean, ForeignKey, Table
from sqlalchemy.orm import relationship
from datetime import datetime
from database import Base# 多對多關系中間表:文章標簽
article_tags = Table('article_tags',Base.metadata,Column('article_id', Integer, ForeignKey('articles.id')),Column('tag_id', Integer, ForeignKey('tags.id'))
)class User(Base):"""用戶模型"""__tablename__ = 'users'id = Column(Integer, primary_key=True, index=True)username = Column(String(50), unique=True, index=True, nullable=False)email = Column(String(100), unique=True, index=True, nullable=False)hashed_password = Column(String(100), nullable=False)full_name = Column(String(100))is_active = Column(Boolean, default=True)is_superuser = Column(Boolean, default=False)created_at = Column(DateTime, default=datetime.now)updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)# 關系:一個用戶可以有多篇文章articles = relationship("Article", back_populates="author", cascade="all, delete-orphan")comments = relationship("Comment", back_populates="author", cascade="all, delete-orphan")def __repr__(self):return f"<User(id={self.id}, username='{self.username}')>"class Category(Base):"""分類模型"""__tablename__ = 'categories'id = Column(Integer, primary_key=True, index=True)name = Column(String(50), unique=True, nullable=False)description = Column(Text)created_at = Column(DateTime, default=datetime.now)# 關系:一個分類可以有多篇文章articles = relationship("Article", back_populates="category")def __repr__(self):return f"<Category(id={self.id}, name='{self.name}')>"class Tag(Base):"""標簽模型"""__tablename__ = 'tags'id = Column(Integer, primary_key=True, index=True)name = Column(String(30), unique=True, nullable=False)created_at = Column(DateTime, default=datetime.now)# 多對多關系:標簽和文章articles = relationship("Article", secondary=article_tags, back_populates="tags")def __repr__(self):return f"<Tag(id={self.id}, name='{self.name}')>"class Article(Base):"""文章模型"""__tablename__ = 'articles'id = Column(Integer, primary_key=True, index=True)title = Column(String(200), nullable=False, index=True)content = Column(Text, nullable=False)summary = Column(String(500))is_published = Column(Boolean, default=False)view_count = Column(Integer, default=0)created_at = Column(DateTime, default=datetime.now, index=True)updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)published_at = Column(DateTime)# 外鍵author_id = Column(Integer, ForeignKey('users.id'), nullable=False)category_id = Column(Integer, ForeignKey('categories.id'))# 關系author = relationship("User", back_populates="articles")category = relationship("Category", back_populates="articles")tags = relationship("Tag", secondary=article_tags, back_populates="articles")comments = relationship("Comment", back_populates="article", cascade="all, delete-orphan")def __repr__(self):return f"<Article(id={self.id}, title='{self.title[:30]}...')>"class Comment(Base):"""評論模型"""__tablename__ = 'comments'id = Column(Integer, primary_key=True, index=True)content = Column(Text, nullable=False)is_approved = Column(Boolean, default=False)created_at = Column(DateTime, default=datetime.now)# 外鍵article_id = Column(Integer, ForeignKey('articles.id'), nullable=False)author_id = Column(Integer, ForeignKey('users.id'), nullable=False)parent_id = Column(Integer, ForeignKey('comments.id')) # 回復評論# 關系article = relationship("Article", back_populates="comments")author = relationship("User", back_populates="comments")parent = relationship("Comment", remote_side=[id]) # 自引用關系def __repr__(self):return f"<Comment(id={self.id}, content='{self.content[:30]}...')>"
8.4 數據操作層 (crud.py)
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, func, desc
from models import User, Article, Category, Tag, Comment
from typing import List, Optional
from datetime import datetimeclass UserCRUD:"""用戶數據操作"""@staticmethoddef create_user(db: Session, username: str, email: str, password: str, full_name: str = None) -> User:"""創建用戶"""user = User(username=username,email=email,hashed_password=password, # 實際應用中需要加密full_name=full_name)db.add(user)db.commit()db.refresh(user)return user@staticmethoddef get_user_by_id(db: Session, user_id: int) -> Optional[User]:"""根據ID獲取用戶"""return db.query(User).filter(User.id == user_id).first()@staticmethoddef get_user_by_username(db: Session, username: str) -> Optional[User]:"""根據用戶名獲取用戶"""return db.query(User).filter(User.username == username).first()@staticmethoddef get_users(db: Session, skip: int = 0, limit: int = 100) -> List[User]:"""獲取用戶列表"""return db.query(User).offset(skip).limit(limit).all()@staticmethoddef update_user(db: Session, user_id: int, **kwargs) -> Optional[User]:"""更新用戶信息"""user = db.query(User).filter(User.id == user_id).first()if user:for key, value in kwargs.items():if hasattr(user, key):setattr(user, key, value)user.updated_at = datetime.now()db.commit()db.refresh(user)return user@staticmethoddef delete_user(db: Session, user_id: int) -> bool:"""刪除用戶"""user = db.query(User).filter(User.id == user_id).first()if user:db.delete(user)db.commit()return Truereturn Falseclass ArticleCRUD:"""文章數據操作"""@staticmethoddef create_article(db: Session, title: str, content: str, author_id: int, category_id: int = None, tag_names: List[str] = None) -> Article:"""創建文章"""article = Article(title=title,content=content,author_id=author_id,category_id=category_id,summary=content[:200] + "..." if len(content) > 200 else content)# 處理標簽if tag_names:for tag_name in tag_names:tag = db.query(Tag).filter(Tag.name == tag_name).first()if not tag:tag = Tag(name=tag_name)db.add(tag)article.tags.append(tag)db.add(article)db.commit()db.refresh(article)return article@staticmethoddef get_article_by_id(db: Session, article_id: int) -> Optional[Article]:"""根據ID獲取文章"""return db.query(Article).filter(Article.id == article_id).first()@staticmethoddef get_articles(db: Session, skip: int = 0, limit: int = 20, published_only: bool = True) -> List[Article]:"""獲取文章列表"""query = db.query(Article)if published_only:query = query.filter(Article.is_published == True)return query.order_by(desc(Article.created_at)).offset(skip).limit(limit).all()@staticmethoddef get_articles_by_category(db: Session, category_id: int, skip: int = 0, limit: int = 20) -> List[Article]:"""根據分類獲取文章"""return db.query(Article).filter(and_(Article.category_id == category_id, Article.is_published == True)).order_by(desc(Article.created_at)).offset(skip).limit(limit).all()@staticmethoddef get_articles_by_tag(db: Session, tag_name: str, skip: int = 0, limit: int = 20) -> List[Article]:"""根據標簽獲取文章"""return db.query(Article).join(Article.tags).filter(and_(Tag.name == tag_name, Article.is_published == True)).order_by(desc(Article.created_at)).offset(skip).limit(limit).all()@staticmethoddef search_articles(db: Session, keyword: str, skip: int = 0, limit: int = 20) -> List[Article]:"""搜索文章"""return db.query(Article).filter(and_(or_(Article.title.contains(keyword),Article.content.contains(keyword)),Article.is_published == True)).order_by(desc(Article.created_at)).offset(skip).limit(limit).all()@staticmethoddef publish_article(db: Session, article_id: int) -> Optional[Article]:"""發布文章"""article = db.query(Article).filter(Article.id == article_id).first()if article:article.is_published = Truearticle.published_at = datetime.now()db.commit()db.refresh(article)return article@staticmethoddef increment_view_count(db: Session, article_id: int) -> Optional[Article]:"""增加文章瀏覽量"""article = db.query(Article).filter(Article.id == article_id).first()if article:article.view_count += 1db.commit()db.refresh(article)return articleclass StatisticsCRUD:"""統計數據操作"""@staticmethoddef get_article_stats(db: Session) -> dict:"""獲取文章統計信息"""total_articles = db.query(func.count(Article.id)).scalar()published_articles = db.query(func.count(Article.id)).filter(Article.is_published == True).scalar()total_views = db.query(func.sum(Article.view_count)).scalar() or 0return {"total_articles": total_articles,"published_articles": published_articles,"draft_articles": total_articles - published_articles,"total_views": total_views}@staticmethoddef get_popular_articles(db: Session, limit: int = 10) -> List[Article]:"""獲取熱門文章"""return db.query(Article).filter(Article.is_published == True).order_by(desc(Article.view_count)).limit(limit).all()@staticmethoddef get_category_stats(db: Session) -> List[dict]:"""獲取分類統計"""results = db.query(Category.name,func.count(Article.id).label('article_count')).outerjoin(Article).group_by(Category.id, Category.name).all()return [{"category": name, "count": count}for name, count in results]@staticmethoddef get_monthly_article_stats(db: Session, year: int) -> List[dict]:"""獲取月度文章統計"""results = db.query(func.extract('month', Article.created_at).label('month'),func.count(Article.id).label('count')).filter(and_(func.extract('year', Article.created_at) == year,Article.is_published == True)).group_by(func.extract('month', Article.created_at)).all()return [{"month": int(month), "count": count}for month, count in results]
8.5 API接口層 (api.py)
from fastapi import FastAPI, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from database import get_db, create_tables
from crud import UserCRUD, ArticleCRUD, StatisticsCRUD
from typing import List, Optional
import uvicorn# 創建FastAPI應用
app = FastAPI(title="博客系統API",description="基于SQLAlchemy的博客系統",version="1.0.0"
)# 啟動時創建表
@app.on_event("startup")
def startup_event():create_tables()# 用戶相關接口
@app.post("/users/", summary="創建用戶")
def create_user(username: str,email: str,password: str,full_name: Optional[str] = None,db: Session = Depends(get_db)
):"""創建新用戶"""# 檢查用戶名是否已存在existing_user = UserCRUD.get_user_by_username(db, username)if existing_user:raise HTTPException(status_code=400, detail="用戶名已存在")user = UserCRUD.create_user(db, username, email, password, full_name)return {"message": "用戶創建成功", "user_id": user.id}@app.get("/users/{user_id}", summary="獲取用戶信息")
def get_user(user_id: int, db: Session = Depends(get_db)):"""根據ID獲取用戶信息"""user = UserCRUD.get_user_by_id(db, user_id)if not user:raise HTTPException(status_code=404, detail="用戶不存在")return {"id": user.id,"username": user.username,"email": user.email,"full_name": user.full_name,"is_active": user.is_active,"created_at": user.created_at}@app.get("/users/", summary="獲取用戶列表")
def get_users(skip: int = Query(0, ge=0),limit: int = Query(20, ge=1, le=100),db: Session = Depends(get_db)
):"""獲取用戶列表"""users = UserCRUD.get_users(db, skip, limit)return {"users": [{"id": user.id,"username": user.username,"email": user.email,"full_name": user.full_name,"is_active": user.is_active}for user in users],"total": len(users)}# 文章相關接口
@app.post("/articles/", summary="創建文章")
def create_article(title: str,content: str,author_id: int,category_id: Optional[int] = None,tag_names: Optional[List[str]] = None,db: Session = Depends(get_db)
):"""創建新文章"""# 驗證作者是否存在author = UserCRUD.get_user_by_id(db, author_id)if not author:raise HTTPException(status_code=404, detail="作者不存在")article = ArticleCRUD.create_article(db, title, content, author_id, category_id, tag_names or [])return {"message": "文章創建成功", "article_id": article.id}@app.get("/articles/{article_id}", summary="獲取文章詳情")
def get_article(article_id: int, db: Session = Depends(get_db)):"""獲取文章詳情"""article = ArticleCRUD.get_article_by_id(db, article_id)if not article:raise HTTPException(status_code=404, detail="文章不存在")# 增加瀏覽量ArticleCRUD.increment_view_count(db, article_id)return {"id": article.id,"title": article.title,"content": article.content,"summary": article.summary,"is_published": article.is_published,"view_count": article.view_count,"created_at": article.created_at,"author": {"id": article.author.id,"username": article.author.username},"category": {"id": article.category.id,"name": article.category.name} if article.category else None,"tags": [{"id": tag.id, "name": tag.name}for tag in article.tags]}@app.get("/articles/", summary="獲取文章列表")
def get_articles(skip: int = Query(0, ge=0),limit: int = Query(20, ge=1, le=100),published_only: bool = Query(True),db: Session = Depends(get_db)
):"""獲取文章列表"""articles = ArticleCRUD.get_articles(db, skip, limit, published_only)return {"articles": [{"id": article.id,"title": article.title,"summary": article.summary,"view_count": article.view_count,"created_at": article.created_at,"author": article.author.username}for article in articles],"total": len(articles)}@app.get("/search/articles/", summary="搜索文章")
def search_articles(keyword: str = Query(..., min_length=1),skip: int = Query(0, ge=0),limit: int = Query(20, ge=1, le=100),db: Session = Depends(get_db)
):"""搜索文章"""articles = ArticleCRUD.search_articles(db, keyword, skip, limit)return {"keyword": keyword,"articles": [{"id": article.id,"title": article.title,"summary": article.summary,"view_count": article.view_count,"created_at": article.created_at}for article in articles],"total": len(articles)}# 統計相關接口
@app.get("/statistics/overview", summary="獲取統計概覽")
def get_statistics_overview(db: Session = Depends(get_db)):"""獲取統計概覽"""stats = StatisticsCRUD.get_article_stats(db)category_stats = StatisticsCRUD.get_category_stats(db)popular_articles = StatisticsCRUD.get_popular_articles(db, 5)return {"article_stats": stats,"category_stats": category_stats,"popular_articles": [{"id": article.id,"title": article.title,"view_count": article.view_count}for article in popular_articles]}if __name__ == "__main__":uvicorn.run(app, host="0.0.0.0", port=8000)
8.6 應用入口 (main.py)
from database import create_tables, get_db
from crud import UserCRUD, ArticleCRUD
from models import User, Article, Category, Tag
from datetime import datetimedef init_sample_data():"""初始化示例數據"""# 獲取數據庫會話db = next(get_db())try:# 創建示例用戶admin_user = UserCRUD.create_user(db, "admin", "admin@blog.com", "hashed_password", "管理員")author_user = UserCRUD.create_user(db, "author", "author@blog.com", "hashed_password", "作者")# 創建示例分類tech_category = Category(name="技術", description="技術相關文章")life_category = Category(name="生活", description="生活隨筆")db.add_all([tech_category, life_category])db.commit()# 創建示例文章article1 = ArticleCRUD.create_article(db,title="SQLAlchemy入門教程",content="這是一篇關于SQLAlchemy的詳細教程...",author_id=author_user.id,category_id=tech_category.id,tag_names=["Python", "數據庫", "ORM"])article2 = ArticleCRUD.create_article(db,title="我的編程之路",content="分享我學習編程的心得體會...",author_id=author_user.id,category_id=life_category.id,tag_names=["編程", "心得"])# 發布文章ArticleCRUD.publish_article(db, article1.id)ArticleCRUD.publish_article(db, article2.id)print("示例數據初始化完成!")except Exception as e:print(f"初始化數據失敗: {e}")db.rollback()finally:db.close()def main():"""主函數"""print("=== SQLAlchemy博客系統 ===")# 創建數據庫表print("創建數據庫表...")create_tables()# 初始化示例數據print("初始化示例數據...")init_sample_data()print("\n系統啟動完成!")print("API文檔地址: http://localhost:8000/docs")print("啟動API服務器: python api.py")if __name__ == "__main__":main()
9. 性能優化與最佳實踐
9.1 查詢優化
# 1. 使用索引
class User(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)username = Column(String(50), index=True) # 單列索引email = Column(String(100), index=True)created_at = Column(DateTime, index=True)# 復合索引__table_args__ = (Index('idx_username_email', 'username', 'email'),Index('idx_active_created', 'is_active', 'created_at'),)# 2. 預加載關聯數據
from sqlalchemy.orm import joinedload, selectinload# 避免N+1查詢問題
with Session() as session:# 錯誤方式:會產生N+1查詢users = session.query(User).all()for user in users:print(f"{user.username}: {len(user.articles)} 篇文章") # 每次都查詢# 正確方式:使用預加載users = session.query(User).options(selectinload(User.articles)).all()for user in users:print(f"{user.username}: {len(user.articles)} 篇文章") # 不會額外查詢# 3. 使用批量操作
with Session() as session:# 批量插入users_data = [{'username': f'user{i}', 'email': f'user{i}@example.com'}for i in range(1000)]session.bulk_insert_mappings(User, users_data)# 批量更新session.query(User).filter(User.created_at < datetime(2023, 1, 1)).update({'is_active': False})session.commit()# 4. 使用原生SQL進行復雜查詢
from sqlalchemy import textwith Session() as session:# 復雜統計查詢使用原生SQLresult = session.execute(text("""SELECT c.name as category_name,COUNT(a.id) as article_count,AVG(a.view_count) as avg_viewsFROM categories cLEFT JOIN articles a ON c.id = a.category_idWHERE a.is_published = trueGROUP BY c.id, c.nameORDER BY article_count DESC"""))for row in result:print(f"分類: {row.category_name}, 文章數: {row.article_count}, 平均瀏覽: {row.avg_views}")
9.2 連接池優化
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool# 生產環境連接池配置
engine = create_engine(DATABASE_URL,# 連接池配置poolclass=QueuePool,pool_size=20, # 連接池大小max_overflow=30, # 最大溢出連接pool_timeout=60, # 獲取連接超時pool_recycle=3600, # 連接回收時間pool_pre_ping=True, # 連接前檢查# 性能優化echo=False, # 生產環境關閉SQL日志future=True, # 使用2.0風格API# 連接參數connect_args={"charset": "utf8mb4","autocommit": False,"check_same_thread": False # SQLite專用}
)# 監控連接池狀態
def monitor_pool_status(engine):"""監控連接池狀態"""pool = engine.poolprint(f"連接池大小: {pool.size()}")print(f"已檢出連接: {pool.checkedout()}")print(f"溢出連接: {pool.overflow()}")print(f"無效連接: {pool.invalidated()}")
9.3 緩存策略
import redis
import json
from functools import wraps
from typing import Any, Optional# Redis緩存配置
redis_client = redis.Redis(host='localhost',port=6379,db=0,decode_responses=True
)def cache_result(key_prefix: str, expire_time: int = 3600):"""結果緩存裝飾器"""def decorator(func):@wraps(func)def wrapper(*args, **kwargs):# 生成緩存鍵cache_key = f"{key_prefix}:{hash(str(args) + str(kwargs))}"# 嘗試從緩存獲取cached_result = redis_client.get(cache_key)if cached_result:return json.loads(cached_result)# 執行函數result = func(*args, **kwargs)# 存入緩存redis_client.setex(cache_key,expire_time,json.dumps(result, default=str))return resultreturn wrapperreturn decorator# 使用緩存
class CachedArticleCRUD(ArticleCRUD):"""帶緩存的文章操作"""@staticmethod@cache_result("hot_articles", 1800) # 緩存30分鐘def get_hot_articles(db: Session, limit: int = 10) -> List[dict]:"""獲取熱門文章(帶緩存)"""articles = db.query(Article).filter(Article.is_published == True).order_by(Article.view_count.desc()).limit(limit).all()return [{"id": article.id,"title": article.title,"view_count": article.view_count}for article in articles]@staticmethoddef invalidate_article_cache(article_id: int):"""清除文章相關緩存"""patterns = [f"article:{article_id}:*","hot_articles:*","recent_articles:*"]for pattern in patterns:keys = redis_client.keys(pattern)if keys:redis_client.delete(*keys)
9.4 數據庫分頁優化
from sqlalchemy import func
from typing import Tuple, Listclass PaginationHelper:"""分頁助手類"""@staticmethoddef paginate_query(query, page: int, per_page: int) -> Tuple[List, dict]:"""查詢分頁"""# 計算總數total = query.count()# 計算分頁信息total_pages = (total + per_page - 1) // per_pagehas_prev = page > 1has_next = page < total_pages# 獲取當前頁數據items = query.offset((page - 1) * per_page).limit(per_page).all()pagination_info = {"page": page,"per_page": per_page,"total": total,"total_pages": total_pages,"has_prev": has_prev,"has_next": has_next,"prev_page": page - 1 if has_prev else None,"next_page": page + 1 if has_next else None}return items, pagination_info@staticmethoddef cursor_paginate(query, cursor_field, cursor_value=None, limit: int = 20, desc: bool = True):"""游標分頁(適合大數據量)"""if cursor_value is not None:if desc:query = query.filter(cursor_field < cursor_value)else:query = query.filter(cursor_field > cursor_value)if desc:query = query.order_by(cursor_field.desc())else:query = query.order_by(cursor_field.asc())items = query.limit(limit + 1).all()has_more = len(items) > limitif has_more:items = items[:-1]next_cursor = Noneif has_more and items:next_cursor = getattr(items[-1], cursor_field.name)return items, {"has_more": has_more,"next_cursor": next_cursor,"limit": limit}# 使用示例
with Session() as session:# 傳統分頁query = session.query(Article).filter(Article.is_published == True)articles, pagination = PaginationHelper.paginate_query(query, page=1, per_page=20)# 游標分頁(適合實時數據)articles, cursor_info = PaginationHelper.cursor_paginate(query, Article.created_at, limit=20)
10. 常見問題與解決方案
10.1 常見錯誤及解決方案
# 1. 解決"DetachedInstanceError"錯誤
from sqlalchemy.orm import make_transientwith Session() as session:user = session.query(User).first()# 會話關閉后訪問關聯對象會報錯# 解決方案1:在會話內訪問所有需要的數據
with Session() as session:user = session.query(User).options(joinedload(User.articles)).first()# 在會話內訪問關聯數據articles = user.articles# 解決方案2:使用expunge_all()或merge()
with Session() as session:user = session.query(User).first()session.expunge(user) # 從會話中移除對象# 在新會話中重新附加
with Session() as session:user = session.merge(user) # 重新附加到會話articles = user.articles# 2. 解決"IntegrityError"約束違反錯誤
try:with Session() as session:user = User(username="existing_user", email="test@example.com")session.add(user)session.commit()
except IntegrityError as e:print(f"約束違反: {e}")# 處理重復數據或約束違反# 3. 解決"PendingRollbackError"錯誤
try:with Session() as session:# 可能出錯的操作user = User(username=None) # 違反非空約束session.add(user)session.commit()
except Exception as e:# 必須回滾事務session.rollback()print(f"操作失敗,已回滾: {e}")# 4. 解決"StatementError"參數錯誤
# 錯誤方式
# session.query(User).filter(User.id == None) # 應該使用is_(None)# 正確方式
session.query(User).filter(User.id.is_(None))
session.query(User).filter(User.id.isnot(None))
10.2 性能問題診斷
import time
from sqlalchemy import event
from sqlalchemy.engine import Engine# SQL執行時間監控
@event.listens_for(Engine, "before_cursor_execute")
def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):context._query_start_time = time.time()@event.listens_for(Engine, "after_cursor_execute")
def receive_after_cursor_execute(conn, cursor, statement, parameters, context, executemany):total = time.time() - context._query_start_timeif total > 0.1: # 記錄超過100ms的查詢print(f"慢查詢 ({total:.3f}s): {statement[:100]}...")# 查詢分析器
class QueryAnalyzer:"""查詢分析器"""def __init__(self, session):self.session = sessionself.queries = []def __enter__(self):# 開始監控event.listen(self.session.bind, "before_cursor_execute", self._before_execute)event.listen(self.session.bind, "after_cursor_execute", self._after_execute)return selfdef __exit__(self, exc_type, exc_val, exc_tb):# 停止監控event.remove(self.session.bind, "before_cursor_execute", self._before_execute)event.remove(self.session.bind, "after_cursor_execute", self._after_execute)# 輸出分析結果self.print_analysis()def _before_execute(self, conn, cursor, statement, parameters, context, executemany):context._start_time = time.time()def _after_execute(self, conn, cursor, statement, parameters, context, executemany):duration = time.time() - context._start_timeself.queries.append({'statement': statement,'duration': duration,'parameters': parameters})def print_analysis(self):"""打印分析結果"""total_queries = len(self.queries)total_time = sum(q['duration'] for q in self.queries)avg_time = total_time / total_queries if total_queries > 0 else 0print(f"\n=== 查詢分析結果 ===")print(f"總查詢數: {total_queries}")print(f"總耗時: {total_time:.3f}s")print(f"平均耗時: {avg_time:.3f}s")# 顯示最慢的查詢slow_queries = sorted(self.queries, key=lambda x: x['duration'], reverse=True)[:5]print(f"\n最慢的5個查詢:")for i, query in enumerate(slow_queries, 1):print(f"{i}. {query['duration']:.3f}s - {query['statement'][:100]}...")# 使用查詢分析器
with Session() as session:with QueryAnalyzer(session) as analyzer:# 執行需要分析的查詢users = session.query(User).all()for user in users:articles = user.articles # 可能產生N+1查詢
10.3 數據一致性保證
from sqlalchemy import event
from sqlalchemy.orm import Session
from datetime import datetime# 自動更新時間戳
@event.listens_for(User, 'before_update')
def update_timestamp(mapper, connection, target):"""更新前自動設置更新時間"""target.updated_at = datetime.now()# 軟刪除實現
class SoftDeleteMixin:"""軟刪除混入類"""deleted_at = Column(DateTime, nullable=True)is_deleted = Column(Boolean, default=False)def soft_delete(self):"""軟刪除"""self.is_deleted = Trueself.deleted_at = datetime.now()def restore(self):"""恢復刪除"""self.is_deleted = Falseself.deleted_at = Noneclass User(Base, SoftDeleteMixin):__tablename__ = 'users'# ... 其他字段# 查詢時自動過濾已刪除記錄
@event.listens_for(Session, 'after_attach')
def auto_filter_deleted(session, instance):"""自動過濾已刪除記錄"""if hasattr(instance.__class__, 'is_deleted'):session.query(instance.__class__).filter(instance.__class__.is_deleted == False)# 數據驗證
from sqlalchemy.orm import validatesclass User(Base):__tablename__ = 'users'email = Column(String(100), nullable=False)@validates('email')def validate_email(self, key, address):"""驗證郵箱格式"""import repattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'if not re.match(pattern, address):raise ValueError("郵箱格式不正確")return address
10.4 調試技巧
# 1. 啟用SQL日志
engine = create_engine('sqlite:///example.db', echo=True)# 2. 自定義日志格式
import logging
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)# 3. 查看生成的SQL
from sqlalchemy.dialects import mysql, postgresql, sqlitequery = session.query(User).filter(User.is_active == True)# 查看不同數據庫的SQL
print("MySQL SQL:", query.statement.compile(dialect=mysql.dialect()))
print("PostgreSQL SQL:", query.statement.compile(dialect=postgresql.dialect()))
print("SQLite SQL:", query.statement.compile(dialect=sqlite.dialect()))# 4. 使用explain分析查詢計劃
def explain_query(session, query):"""分析查詢執行計劃"""sql = str(query.statement.compile(compile_kwargs={"literal_binds": True}))explain_sql = f"EXPLAIN QUERY PLAN {sql}"result = session.execute(explain_sql)print("查詢執行計劃:")for row in result:print(row)# 使用示例
with Session() as session:query = session.query(User).filter(User.is_active == True)explain_query(session, query)
總結
本教程全面介紹了SQLAlchemy的核心概念、基本用法和高級特性。通過學習本教程,你應該能夠:
- 理解SQLAlchemy架構:掌握ORM層、Core層、Engine層的作用和關系
- 熟練使用基本功能:數據模型定義、CRUD操作、查詢語法
- 掌握高級特性:復雜查詢、關系映射、事務管理、性能優化
- 應用最佳實踐:連接池配置、緩存策略、錯誤處理、調試技巧
- 構建實際項目:通過博客系統示例了解完整的開發流程
學習建議
- 循序漸進:從基礎概念開始,逐步深入高級特性
- 動手實踐:運行示例代碼,修改參數觀察結果
- 閱讀文檔:結合官方文檔深入理解細節
- 項目實戰:在實際項目中應用所學知識
- 持續學習:關注SQLAlchemy更新,學習新特性
進階方向
- 異步編程:學習SQLAlchemy的異步支持
- 微服務架構:在分布式系統中使用SQLAlchemy
- 數據庫優化:深入學習數據庫性能調優
- 框架集成:與Flask、FastAPI、Django等框架集成
- 數據遷移:使用Alembic進行數據庫版本管理
希望這份教程能夠幫助你掌握SQLAlchemy,在Python數據庫開發中游刃有余!