【自記】Python 的 SQLAlchemy 完整實踐教程

目錄

  1. SQLAlchemy 介紹
  2. 環境準備與安裝
  3. 數據庫連接
  4. 數據模型定義
  5. 基本數據操作
  6. 復雜查詢操作
  7. 高級特性
  8. 實戰項目示例
  9. 性能優化與最佳實踐
  10. 常見問題與解決方案

1. SQLAlchemy 介紹

1.1 什么是SQLAlchemy

SQLAlchemy 是一個用于 Python 的 SQL 工具和對象關系映射(ORM)庫。它允許開發者通過 Python 代碼來與關系型數據庫交互,而不必直接編寫SQL語句。

對象關系映射(ORM) 是一種程序設計技術,用于實現面向對象編程語言里不同類型系統的數據之間的轉換。簡單來說,就是將數據庫表映射為Python類,將表中的記錄映射為類的實例。

1.2 SQLAlchemy 架構

SQLAlchemy 采用分層架構設計:

┌─────────────────────────────────────┐
│            ORM 層                   │
│  (對象關系映射 - 高級抽象)            │
├─────────────────────────────────────┤
│           Core 層                   │
│  (SQL表達式語言 - 底層抽象)           │
├─────────────────────────────────────┤
│          Engine 層                  │
│  (數據庫引擎 - 連接管理)              │
├─────────────────────────────────────┤
│         DBAPI 層                    │
│  (數據庫驅動 - 具體實現)              │
└─────────────────────────────────────┘

1.3 主要應用場景

SQLAlchemy 主要應用于以下場景:

  • 數據庫訪問和操作:提供高層抽象來操作數據庫,避免編寫原生SQL語句
  • ORM映射:建立Python類與數據庫表的映射關系,簡化數據模型操作
  • 復雜查詢:提供豐富的查詢方式,如過濾、分組、聯結等
  • 異步查詢:基于Greenlet等實現異步查詢,提高查詢效率
  • 事務控制:通過Session管理數據庫會話和事務
  • 多數據庫支持:支持PostgreSQL、MySQL、Oracle、SQLite等主流數據庫
  • Web框架集成:與Flask、FastAPI等框架無縫集成

2. 環境準備與安裝

2.1 安裝SQLAlchemy

# 安裝SQLAlchemy核心庫
pip install sqlalchemy# 安裝數據庫驅動(根據需要選擇)
pip install pymysql          # MySQL驅動
pip install psycopg2-binary  # PostgreSQL驅動
pip install cx_Oracle        # Oracle驅動
# SQLite驅動已內置在Python標準庫中

2.2 數據庫依賴對照表

數據庫類型依賴庫連接字符串示例
關系型數據庫
MySQLpymysqlmysql+pymysql://username:password@localhost:3306/database_name
PostgreSQLpsycopg2postgresql://username:password@localhost:5432/database_name
SQLite內置sqlite:///example.db
Oraclecx_Oracleoracle://username:password@localhost:1521/orcl
NoSQL數據庫
MongoDBpymongomongodb://username:password@localhost:27017/database_name
Redisredisredis://localhost:6379/0

2.3 驗證安裝

# 驗證SQLAlchemy安裝
import sqlalchemy
print(f"SQLAlchemy版本: {sqlalchemy.__version__}")# 測試數據庫連接
from sqlalchemy import create_engine# 創建內存SQLite數據庫進行測試
engine = create_engine('sqlite:///:memory:', echo=True)
print("數據庫連接測試成功!")

3. 數據庫連接

3.1 創建數據庫引擎

數據庫引擎是SQLAlchemy的核心組件,負責管理數據庫連接。

from sqlalchemy import create_engine# SQLite連接(文件數據庫)
engine = create_engine('sqlite:///example.db', echo=True)# MySQL連接
dbHost = 'mysql+pymysql://root:password@127.0.0.1:3306/test'
engine = create_engine(dbHost,echo=True,              # 是否打印SQL語句pool_size=10,           # 連接池大小max_overflow=20,        # 超出連接池大小的連接數pool_pre_ping=True,     # 連接前檢查連接有效性pool_recycle=3600       # 連接回收時間(秒)
)# PostgreSQL連接
engine = create_engine('postgresql://username:password@localhost:5432/database',echo=False,pool_size=5,max_overflow=10
)

3.2 引擎參數詳解

參數說明默認值
echo是否打印執行的SQL語句False
pool_size連接池保持的連接數5
max_overflow允許超過pool_size的最大連接數10
pool_timeout獲取連接的超時時間(秒)30
pool_recycle連接在連接池中保持的最長時間(秒)-1
pool_pre_ping連接前檢查連接是否有效False

3.3 連接池管理

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool# 自定義連接池配置
engine = create_engine('mysql+pymysql://user:password@localhost/dbname',poolclass=QueuePool,pool_size=20,           # 連接池大小max_overflow=30,        # 最大溢出連接數pool_timeout=60,        # 獲取連接超時時間pool_recycle=7200,      # 連接回收時間(2小時)echo=True
)# 獲取連接池狀態信息
print(f"連接池大小: {engine.pool.size()}")
print(f"已檢出連接數: {engine.pool.checkedout()}")
print(f"溢出連接數: {engine.pool.overflow()}")

4. 數據模型定義

4.1 聲明式基類

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float, DateTime, Boolean, Text
from datetime import datetime# 創建聲明式基類
Base = declarative_base()

4.2 基礎模型定義

class User(Base):"""用戶模型類"""__tablename__ = 'users'  # 指定表名__table_args__ = {'comment': '用戶信息表'}  # 表注釋# 定義字段id = Column(Integer, primary_key=True, autoincrement=True, comment='用戶ID')username = Column(String(50), nullable=False, unique=True, comment='用戶名')email = Column(String(100), nullable=False, index=True, comment='郵箱')password_hash = Column(String(128), nullable=False, comment='密碼哈希')full_name = Column(String(100), comment='全名')is_active = Column(Boolean, default=True, comment='是否激活')created_at = Column(DateTime, default=datetime.now, comment='創建時間')updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now, comment='更新時間')def __repr__(self):return f"<User(id={self.id}, username='{self.username}')>"def __str__(self):return f"用戶: {self.username} ({self.email})"

4.3 Column常用參數說明

參數說明示例
primary_key是否為主鍵primary_key=True
nullable是否允許為空nullable=False
unique是否唯一unique=True
index是否創建索引index=True
default默認值default=0
onupdate更新時的默認值onupdate=datetime.now
autoincrement是否自增autoincrement=True
comment字段注釋comment='用戶ID'

4.4 復雜數據類型示例

from sqlalchemy import JSON, DECIMAL, Enum
from sqlalchemy.dialects.mysql import LONGTEXT
import enumclass UserStatus(enum.Enum):"""用戶狀態枚舉"""ACTIVE = "active"INACTIVE = "inactive"SUSPENDED = "suspended"class Product(Base):"""商品模型類"""__tablename__ = 'products'id = Column(Integer, primary_key=True)name = Column(String(200), nullable=False, comment='商品名稱')description = Column(Text, comment='商品描述')price = Column(DECIMAL(10, 2), nullable=False, comment='價格')stock = Column(Integer, default=0, comment='庫存數量')# JSON字段存儲額外屬性attributes = Column(JSON, comment='商品屬性')# 枚舉字段status = Column(Enum(UserStatus), default=UserStatus.ACTIVE, comment='狀態')# 長文本字段content = Column(LONGTEXT, comment='詳細內容')created_at = Column(DateTime, default=datetime.now)updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)

4.5 表關系定義

from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationshipclass Category(Base):"""商品分類模型"""__tablename__ = 'categories'id = Column(Integer, primary_key=True)name = Column(String(100), nullable=False, comment='分類名稱')description = Column(Text, comment='分類描述')# 一對多關系:一個分類有多個商品products = relationship("Product", back_populates="category")class Product(Base):"""商品模型(帶關系)"""__tablename__ = 'products'id = Column(Integer, primary_key=True)name = Column(String(200), nullable=False)category_id = Column(Integer, ForeignKey('categories.id'), comment='分類ID')# 多對一關系:多個商品屬于一個分類category = relationship("Category", back_populates="products")# 多對多關系示例
from sqlalchemy import Table# 中間表定義
user_role_association = Table('user_roles',Base.metadata,Column('user_id', Integer, ForeignKey('users.id')),Column('role_id', Integer, ForeignKey('roles.id'))
)class Role(Base):"""角色模型"""__tablename__ = 'roles'id = Column(Integer, primary_key=True)name = Column(String(50), nullable=False, unique=True)description = Column(String(200))# 多對多關系:角色和用戶users = relationship("User", secondary=user_role_association, back_populates="roles")# 更新User模型,添加角色關系
User.roles = relationship("Role", secondary=user_role_association, back_populates="users")

5. 基本數據操作

5.1 創建表結構

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base# 創建引擎
engine = create_engine('sqlite:///example.db', echo=True)# 創建所有表
Base.metadata.create_all(engine)# 刪除所有表(謹慎使用)
# Base.metadata.drop_all(engine)# 創建特定表
# User.__table__.create(engine, checkfirst=True)# 刪除特定表
# User.__table__.drop(engine, checkfirst=True)

5.2 會話管理

from sqlalchemy.orm import sessionmaker, scoped_session# 創建會話類
Session = sessionmaker(bind=engine)# 方式1:手動管理會話
session = Session()
try:# 數據庫操作user = User(username='john', email='john@example.com')session.add(user)session.commit()
except Exception as e:session.rollback()print(f"操作失敗: {e}")
finally:session.close()# 方式2:使用上下文管理器(推薦)
with Session() as session:user = User(username='jane', email='jane@example.com')session.add(user)session.commit()# 自動關閉會話# 方式3:使用scoped_session(線程安全)
SessionLocal = scoped_session(sessionmaker(bind=engine))def get_session():"""獲取數據庫會話"""return SessionLocal()def close_session():"""關閉會話"""SessionLocal.remove()

5.3 插入數據

# 單條數據插入
with Session() as session:# 創建用戶對象new_user = User(username='alice',email='alice@example.com',full_name='Alice Smith',password_hash='hashed_password_here')# 添加到會話session.add(new_user)# 提交事務session.commit()# 獲取插入后的IDprint(f"新用戶ID: {new_user.id}")# 批量插入
with Session() as session:users = [User(username='bob', email='bob@example.com', full_name='Bob Johnson'),User(username='charlie', email='charlie@example.com', full_name='Charlie Brown'),User(username='diana', email='diana@example.com', full_name='Diana Prince')]# 批量添加session.add_all(users)session.commit()print(f"批量插入了 {len(users)} 個用戶")# 使用bulk_insert_mappings(高性能批量插入)
with Session() as session:user_data = [{'username': 'user1', 'email': 'user1@example.com', 'full_name': 'User One'},{'username': 'user2', 'email': 'user2@example.com', 'full_name': 'User Two'},{'username': 'user3', 'email': 'user3@example.com', 'full_name': 'User Three'}]session.bulk_insert_mappings(User, user_data)session.commit()

5.4 查詢數據

with Session() as session:# 查詢所有用戶all_users = session.query(User).all()print(f"總用戶數: {len(all_users)}")# 查詢第一個用戶first_user = session.query(User).first()print(f"第一個用戶: {first_user}")# 根據ID查詢user_by_id = session.query(User).get(1)  # 主鍵查詢if user_by_id:print(f"ID為1的用戶: {user_by_id.username}")# 條件查詢active_users = session.query(User).filter(User.is_active == True).all()print(f"活躍用戶數: {len(active_users)}")# 單條記錄查詢(確保唯一)try:unique_user = session.query(User).filter(User.username == 'alice').one()print(f"找到用戶: {unique_user.email}")except Exception as e:print(f"查詢失敗: {e}")# 可能為空的單條查詢maybe_user = session.query(User).filter(User.username == 'nonexistent').one_or_none()if maybe_user:print(f"找到用戶: {maybe_user.username}")else:print("用戶不存在")

5.5 更新數據

with Session() as session:# 方式1:查詢后更新user = session.query(User).filter(User.username == 'alice').first()if user:user.full_name = 'Alice Johnson'  # 修改屬性user.updated_at = datetime.now()  # 更新時間session.commit()print(f"用戶 {user.username} 信息已更新")# 方式2:批量更新updated_count = session.query(User).filter(User.is_active == True).update({User.updated_at: datetime.now()})session.commit()print(f"批量更新了 {updated_count} 個用戶")# 方式3:條件更新session.query(User).filter(User.created_at < datetime(2023, 1, 1)).update({User.is_active: False,User.updated_at: datetime.now()})session.commit()

5.6 刪除數據

with Session() as session:# 方式1:查詢后刪除user_to_delete = session.query(User).filter(User.username == 'bob').first()if user_to_delete:session.delete(user_to_delete)session.commit()print(f"用戶 {user_to_delete.username} 已刪除")# 方式2:批量刪除deleted_count = session.query(User).filter(User.is_active == False).delete()session.commit()print(f"批量刪除了 {deleted_count} 個用戶")# 方式3:條件刪除session.query(User).filter(User.created_at < datetime(2022, 1, 1)).delete()session.commit()

6. 復雜查詢操作

6.1 條件查詢

from sqlalchemy import and_, or_, not_, func
from sqlalchemy.sql import textwith Session() as session:# 基本條件查詢users = session.query(User).filter(User.is_active == True).all()# 多條件查詢(AND)users = session.query(User).filter(and_(User.is_active == True,User.created_at > datetime(2023, 1, 1))).all()# 或條件查詢(OR)users = session.query(User).filter(or_(User.username.like('%admin%'),User.email.like('%@admin.com'))).all()# 非條件查詢(NOT)users = session.query(User).filter(not_(User.is_active == False)).all()# IN查詢user_ids = [1, 2, 3, 4, 5]users = session.query(User).filter(User.id.in_(user_ids)).all()# NOT IN查詢users = session.query(User).filter(~User.id.in_(user_ids)).all()# LIKE模糊查詢users = session.query(User).filter(User.username.like('%john%')).all()# ILIKE不區分大小寫查詢(PostgreSQL)users = session.query(User).filter(User.username.ilike('%JOHN%')).all()# BETWEEN范圍查詢users = session.query(User).filter(User.created_at.between(datetime(2023, 1, 1), datetime(2023, 12, 31))).all()# IS NULL查詢users = session.query(User).filter(User.full_name.is_(None)).all()# IS NOT NULL查詢users = session.query(User).filter(User.full_name.isnot(None)).all()# 原生SQL條件users = session.query(User).filter(text("username LIKE :pattern")).params(pattern='%admin%').all()

6.2 排序和分頁

with Session() as session:# 升序排序users = session.query(User).order_by(User.created_at).all()# 降序排序users = session.query(User).order_by(User.created_at.desc()).all()# 多字段排序users = session.query(User).order_by(User.is_active.desc(),  # 先按活躍狀態降序User.created_at.asc()   # 再按創建時間升序).all()# 限制結果數量recent_users = session.query(User).order_by(User.created_at.desc()).limit(10).all()# 跳過指定數量的記錄users = session.query(User).offset(20).limit(10).all()# 分頁查詢page = 1per_page = 10users = session.query(User).offset((page - 1) * per_page).limit(per_page).all()# 使用slice進行分頁(更Pythonic)users = session.query(User)[20:30]  # 獲取第21-30條記錄

6.3 聚合查詢

from sqlalchemy import func, distinctwith Session() as session:# 計數查詢total_users = session.query(func.count(User.id)).scalar()print(f"總用戶數: {total_users}")# 去重計數unique_emails = session.query(func.count(distinct(User.email))).scalar()print(f"唯一郵箱數: {unique_emails}")# 最大值、最小值latest_user = session.query(func.max(User.created_at)).scalar()earliest_user = session.query(func.min(User.created_at)).scalar()# 平均值(假設User有age字段)# avg_age = session.query(func.avg(User.age)).scalar()# 求和# total_age = session.query(func.sum(User.age)).scalar()# 分組查詢user_counts_by_status = session.query(User.is_active,func.count(User.id).label('count')).group_by(User.is_active).all()for is_active, count in user_counts_by_status:status = "活躍" if is_active else "非活躍"print(f"{status}用戶數: {count}")# HAVING子句(分組后篩選)active_email_domains = session.query(func.substr(User.email, func.instr(User.email, '@') + 1).label('domain'),func.count(User.id).label('count')).filter(User.is_active == True).group_by(func.substr(User.email, func.instr(User.email, '@') + 1)).having(func.count(User.id) > 1  # 只顯示用戶數大于1的域名).all()

6.4 連接查詢(JOIN)

with Session() as session:# 內連接(INNER JOIN)results = session.query(User, Product).join(Product, User.id == Product.user_id).all()# 左外連接(LEFT OUTER JOIN)results = session.query(User, Product).outerjoin(Product, User.id == Product.user_id).all()# 使用relationship進行連接results = session.query(User).join(User.products).all()# 連接查詢with條件results = session.query(User, Product).join(Product).filter(Product.price > 100).all()# 多表連接results = session.query(User, Product, Category).join(Product).join(Category).filter(Category.name == 'Electronics').all()# 自連接(查找同一表中的關聯數據)# 假設User表有manager_id字段manager_alias = aliased(User)results = session.query(User, manager_alias).join(manager_alias, User.manager_id == manager_alias.id).all()

6.5 子查詢

from sqlalchemy.orm import aliasedwith Session() as session:# 標量子查詢avg_price_subquery = session.query(func.avg(Product.price)).scalar_subquery()expensive_products = session.query(Product).filter(Product.price > avg_price_subquery).all()# 表子查詢user_product_count = session.query(Product.user_id,func.count(Product.id).label('product_count')).group_by(Product.user_id).subquery()# 使用子查詢結果productive_users = session.query(User).join(user_product_count, User.id == user_product_count.c.user_id).filter(user_product_count.c.product_count > 5).all()# EXISTS子查詢from sqlalchemy.sql import existsusers_with_products = session.query(User).filter(exists().where(Product.user_id == User.id)).all()# NOT EXISTS子查詢users_without_products = session.query(User).filter(~exists().where(Product.user_id == User.id)).all()# IN子查詢active_user_ids = session.query(User.id).filter(User.is_active == True).subquery()products_by_active_users = session.query(Product).filter(Product.user_id.in_(active_user_ids)).all()

6.6 窗口函數(高級查詢)

from sqlalchemy import funcwith Session() as session:# ROW_NUMBER() 窗口函數results = session.query(User.id,User.username,func.row_number().over(order_by=User.created_at.desc()).label('row_num')).all()# RANK() 窗口函數results = session.query(Product.id,Product.name,Product.price,func.rank().over(order_by=Product.price.desc()).label('price_rank')).all()# 分區窗口函數results = session.query(Product.id,Product.name,Product.category_id,Product.price,func.row_number().over(partition_by=Product.category_id,order_by=Product.price.desc()).label('category_rank')).all()# LAG/LEAD 窗口函數(獲取前一行/后一行數據)results = session.query(User.id,User.username,User.created_at,func.lag(User.created_at, 1).over(order_by=User.created_at).label('prev_created_at')).all()

7. 高級特性

7.1 事務管理

from sqlalchemy.exc import SQLAlchemyError# 基本事務管理
with Session() as session:try:# 開始事務(自動開始)user1 = User(username='user1', email='user1@example.com')user2 = User(username='user2', email='user2@example.com')session.add(user1)session.add(user2)# 提交事務session.commit()print("事務提交成功")except SQLAlchemyError as e:# 回滾事務session.rollback()print(f"事務回滾: {e}")raise# 嵌套事務(保存點)
with Session() as session:try:user = User(username='main_user', email='main@example.com')session.add(user)# 創建保存點savepoint = session.begin_nested()try:# 可能失敗的操作risky_user = User(username='risky', email='invalid_email')session.add(risky_user)session.flush()  # 強制執行SQL但不提交# 如果成功,提交保存點savepoint.commit()except Exception as e:# 回滾到保存點savepoint.rollback()print(f"保存點回滾: {e}")# 主事務提交session.commit()except Exception as e:session.rollback()print(f"主事務回滾: {e}")

7.2 懶加載與預加載

from sqlalchemy.orm import joinedload, selectinload, subqueryload# 懶加載(默認行為)
with Session() as session:user = session.query(User).first()# 訪問關聯對象時才執行查詢products = user.products  # 這里會觸發額外的SQL查詢# 預加載 - joinedload(使用JOIN)
with Session() as session:users = session.query(User).options(joinedload(User.products)).all()# 一次查詢獲取用戶和產品數據for user in users:print(f"用戶 {user.username}{len(user.products)} 個產品")# 預加載 - selectinload(使用IN查詢)
with Session() as session:users = session.query(User).options(selectinload(User.products)).all()# 兩次查詢:先查用戶,再用IN查詢產品# 預加載 - subqueryload(使用子查詢)
with Session() as session:users = session.query(User).options(subqueryload(User.products)).all()# 使用子查詢預加載關聯數據# 多層預加載
with Session() as session:users = session.query(User).options(joinedload(User.products).joinedload(Product.category)).all()# 一次查詢獲取用戶、產品和分類數據# 選擇性預加載
with Session() as session:users = session.query(User).options(selectinload(User.products).selectinload(Product.reviews)).filter(User.is_active == True).all()

7.3 緩存機制

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session# 一級緩存(Session級別)
with Session() as session:# 第一次查詢,從數據庫獲取user1 = session.query(User).get(1)# 第二次查詢,從Session緩存獲取user2 = session.query(User).get(1)# 兩個對象是同一個實例assert user1 is user2print("Session緩存生效")# 二級緩存(需要額外配置)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session
from sqlalchemy import event# 簡單的內存緩存實現
class SimpleCache:def __init__(self):self._cache = {}def get(self, key):return self._cache.get(key)def set(self, key, value):self._cache[key] = valuedef delete(self, key):self._cache.pop(key, None)cache = SimpleCache()# 查詢緩存裝飾器
def cached_query(cache_key):def decorator(func):def wrapper(*args, **kwargs):# 嘗試從緩存獲取result = cache.get(cache_key)if result is not None:print(f"從緩存獲取: {cache_key}")return result# 執行查詢result = func(*args, **kwargs)# 存入緩存cache.set(cache_key, result)print(f"存入緩存: {cache_key}")return resultreturn wrapperreturn decorator@cached_query('all_active_users')
def get_active_users(session):return session.query(User).filter(User.is_active == True).all()

7.4 數據庫遷移

# 使用Alembic進行數據庫遷移
# 首先安裝: pip install alembic# 初始化遷移環境
# alembic init migrations# 創建遷移腳本
# alembic revision --autogenerate -m "創建用戶表"# 執行遷移
# alembic upgrade head# 回滾遷移
# alembic downgrade -1# 編程方式創建遷移
from alembic import command
from alembic.config import Configdef run_migrations():"""運行數據庫遷移"""alembic_cfg = Config("alembic.ini")command.upgrade(alembic_cfg, "head")def create_migration(message):"""創建新的遷移文件"""alembic_cfg = Config("alembic.ini")command.revision(alembic_cfg, autogenerate=True, message=message)

7.5 異步支持

# SQLAlchemy 1.4+ 異步支持
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
import asyncio# 創建異步引擎
async_engine = create_async_engine("postgresql+asyncpg://user:password@localhost/dbname",echo=True
)# 創建異步會話
AsyncSessionLocal = async_sessionmaker(async_engine,class_=AsyncSession,expire_on_commit=False
)# 異步數據操作
async def create_user_async(username: str, email: str):"""異步創建用戶"""async with AsyncSessionLocal() as session:user = User(username=username, email=email)session.add(user)await session.commit()return userasync def get_users_async():"""異步獲取用戶列表"""async with AsyncSessionLocal() as session:result = await session.execute(select(User).filter(User.is_active == True))return result.scalars().all()# 運行異步函數
async def main():# 創建表async with async_engine.begin() as conn:await conn.run_sync(Base.metadata.create_all)# 創建用戶user = await create_user_async("async_user", "async@example.com")print(f"創建用戶: {user.username}")# 查詢用戶users = await get_users_async()print(f"查詢到 {len(users)} 個用戶")# 運行異步代碼
# asyncio.run(main())

8. 實戰項目示例

8.1 項目結構

blog_project/
├── main.py              # 應用入口
├── database.py          # 數據庫配置
├── models.py           # 數據模型
├── schemas.py          # 數據驗證模式
├── crud.py             # 數據庫操作
├── api.py              # API接口
└── requirements.txt    # 依賴包

8.2 數據庫配置 (database.py)

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import os# 數據庫URL配置
DATABASE_URL = os.getenv("DATABASE_URL", "sqlite:///./blog.db"
)# 創建數據庫引擎
engine = create_engine(DATABASE_URL,echo=True,  # 開發環境顯示SQLpool_pre_ping=True,pool_recycle=3600
)# 創建會話工廠
SessionLocal = sessionmaker(autocommit=False,autoflush=False,bind=engine
)# 創建基類
Base = declarative_base()# 依賴注入:獲取數據庫會話
def get_db():"""獲取數據庫會話"""db = SessionLocal()try:yield dbfinally:db.close()# 創建所有表
def create_tables():"""創建數據庫表"""Base.metadata.create_all(bind=engine)# 刪除所有表
def drop_tables():"""刪除數據庫表"""Base.metadata.drop_all(bind=engine)

8.3 數據模型 (models.py)

from sqlalchemy import Column, Integer, String, Text, DateTime, Boolean, ForeignKey, Table
from sqlalchemy.orm import relationship
from datetime import datetime
from database import Base# 多對多關系中間表:文章標簽
article_tags = Table('article_tags',Base.metadata,Column('article_id', Integer, ForeignKey('articles.id')),Column('tag_id', Integer, ForeignKey('tags.id'))
)class User(Base):"""用戶模型"""__tablename__ = 'users'id = Column(Integer, primary_key=True, index=True)username = Column(String(50), unique=True, index=True, nullable=False)email = Column(String(100), unique=True, index=True, nullable=False)hashed_password = Column(String(100), nullable=False)full_name = Column(String(100))is_active = Column(Boolean, default=True)is_superuser = Column(Boolean, default=False)created_at = Column(DateTime, default=datetime.now)updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)# 關系:一個用戶可以有多篇文章articles = relationship("Article", back_populates="author", cascade="all, delete-orphan")comments = relationship("Comment", back_populates="author", cascade="all, delete-orphan")def __repr__(self):return f"<User(id={self.id}, username='{self.username}')>"class Category(Base):"""分類模型"""__tablename__ = 'categories'id = Column(Integer, primary_key=True, index=True)name = Column(String(50), unique=True, nullable=False)description = Column(Text)created_at = Column(DateTime, default=datetime.now)# 關系:一個分類可以有多篇文章articles = relationship("Article", back_populates="category")def __repr__(self):return f"<Category(id={self.id}, name='{self.name}')>"class Tag(Base):"""標簽模型"""__tablename__ = 'tags'id = Column(Integer, primary_key=True, index=True)name = Column(String(30), unique=True, nullable=False)created_at = Column(DateTime, default=datetime.now)# 多對多關系:標簽和文章articles = relationship("Article", secondary=article_tags, back_populates="tags")def __repr__(self):return f"<Tag(id={self.id}, name='{self.name}')>"class Article(Base):"""文章模型"""__tablename__ = 'articles'id = Column(Integer, primary_key=True, index=True)title = Column(String(200), nullable=False, index=True)content = Column(Text, nullable=False)summary = Column(String(500))is_published = Column(Boolean, default=False)view_count = Column(Integer, default=0)created_at = Column(DateTime, default=datetime.now, index=True)updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)published_at = Column(DateTime)# 外鍵author_id = Column(Integer, ForeignKey('users.id'), nullable=False)category_id = Column(Integer, ForeignKey('categories.id'))# 關系author = relationship("User", back_populates="articles")category = relationship("Category", back_populates="articles")tags = relationship("Tag", secondary=article_tags, back_populates="articles")comments = relationship("Comment", back_populates="article", cascade="all, delete-orphan")def __repr__(self):return f"<Article(id={self.id}, title='{self.title[:30]}...')>"class Comment(Base):"""評論模型"""__tablename__ = 'comments'id = Column(Integer, primary_key=True, index=True)content = Column(Text, nullable=False)is_approved = Column(Boolean, default=False)created_at = Column(DateTime, default=datetime.now)# 外鍵article_id = Column(Integer, ForeignKey('articles.id'), nullable=False)author_id = Column(Integer, ForeignKey('users.id'), nullable=False)parent_id = Column(Integer, ForeignKey('comments.id'))  # 回復評論# 關系article = relationship("Article", back_populates="comments")author = relationship("User", back_populates="comments")parent = relationship("Comment", remote_side=[id])  # 自引用關系def __repr__(self):return f"<Comment(id={self.id}, content='{self.content[:30]}...')>"

8.4 數據操作層 (crud.py)

from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, func, desc
from models import User, Article, Category, Tag, Comment
from typing import List, Optional
from datetime import datetimeclass UserCRUD:"""用戶數據操作"""@staticmethoddef create_user(db: Session, username: str, email: str, password: str, full_name: str = None) -> User:"""創建用戶"""user = User(username=username,email=email,hashed_password=password,  # 實際應用中需要加密full_name=full_name)db.add(user)db.commit()db.refresh(user)return user@staticmethoddef get_user_by_id(db: Session, user_id: int) -> Optional[User]:"""根據ID獲取用戶"""return db.query(User).filter(User.id == user_id).first()@staticmethoddef get_user_by_username(db: Session, username: str) -> Optional[User]:"""根據用戶名獲取用戶"""return db.query(User).filter(User.username == username).first()@staticmethoddef get_users(db: Session, skip: int = 0, limit: int = 100) -> List[User]:"""獲取用戶列表"""return db.query(User).offset(skip).limit(limit).all()@staticmethoddef update_user(db: Session, user_id: int, **kwargs) -> Optional[User]:"""更新用戶信息"""user = db.query(User).filter(User.id == user_id).first()if user:for key, value in kwargs.items():if hasattr(user, key):setattr(user, key, value)user.updated_at = datetime.now()db.commit()db.refresh(user)return user@staticmethoddef delete_user(db: Session, user_id: int) -> bool:"""刪除用戶"""user = db.query(User).filter(User.id == user_id).first()if user:db.delete(user)db.commit()return Truereturn Falseclass ArticleCRUD:"""文章數據操作"""@staticmethoddef create_article(db: Session, title: str, content: str, author_id: int, category_id: int = None, tag_names: List[str] = None) -> Article:"""創建文章"""article = Article(title=title,content=content,author_id=author_id,category_id=category_id,summary=content[:200] + "..." if len(content) > 200 else content)# 處理標簽if tag_names:for tag_name in tag_names:tag = db.query(Tag).filter(Tag.name == tag_name).first()if not tag:tag = Tag(name=tag_name)db.add(tag)article.tags.append(tag)db.add(article)db.commit()db.refresh(article)return article@staticmethoddef get_article_by_id(db: Session, article_id: int) -> Optional[Article]:"""根據ID獲取文章"""return db.query(Article).filter(Article.id == article_id).first()@staticmethoddef get_articles(db: Session, skip: int = 0, limit: int = 20, published_only: bool = True) -> List[Article]:"""獲取文章列表"""query = db.query(Article)if published_only:query = query.filter(Article.is_published == True)return query.order_by(desc(Article.created_at)).offset(skip).limit(limit).all()@staticmethoddef get_articles_by_category(db: Session, category_id: int, skip: int = 0, limit: int = 20) -> List[Article]:"""根據分類獲取文章"""return db.query(Article).filter(and_(Article.category_id == category_id, Article.is_published == True)).order_by(desc(Article.created_at)).offset(skip).limit(limit).all()@staticmethoddef get_articles_by_tag(db: Session, tag_name: str, skip: int = 0, limit: int = 20) -> List[Article]:"""根據標簽獲取文章"""return db.query(Article).join(Article.tags).filter(and_(Tag.name == tag_name, Article.is_published == True)).order_by(desc(Article.created_at)).offset(skip).limit(limit).all()@staticmethoddef search_articles(db: Session, keyword: str, skip: int = 0, limit: int = 20) -> List[Article]:"""搜索文章"""return db.query(Article).filter(and_(or_(Article.title.contains(keyword),Article.content.contains(keyword)),Article.is_published == True)).order_by(desc(Article.created_at)).offset(skip).limit(limit).all()@staticmethoddef publish_article(db: Session, article_id: int) -> Optional[Article]:"""發布文章"""article = db.query(Article).filter(Article.id == article_id).first()if article:article.is_published = Truearticle.published_at = datetime.now()db.commit()db.refresh(article)return article@staticmethoddef increment_view_count(db: Session, article_id: int) -> Optional[Article]:"""增加文章瀏覽量"""article = db.query(Article).filter(Article.id == article_id).first()if article:article.view_count += 1db.commit()db.refresh(article)return articleclass StatisticsCRUD:"""統計數據操作"""@staticmethoddef get_article_stats(db: Session) -> dict:"""獲取文章統計信息"""total_articles = db.query(func.count(Article.id)).scalar()published_articles = db.query(func.count(Article.id)).filter(Article.is_published == True).scalar()total_views = db.query(func.sum(Article.view_count)).scalar() or 0return {"total_articles": total_articles,"published_articles": published_articles,"draft_articles": total_articles - published_articles,"total_views": total_views}@staticmethoddef get_popular_articles(db: Session, limit: int = 10) -> List[Article]:"""獲取熱門文章"""return db.query(Article).filter(Article.is_published == True).order_by(desc(Article.view_count)).limit(limit).all()@staticmethoddef get_category_stats(db: Session) -> List[dict]:"""獲取分類統計"""results = db.query(Category.name,func.count(Article.id).label('article_count')).outerjoin(Article).group_by(Category.id, Category.name).all()return [{"category": name, "count": count}for name, count in results]@staticmethoddef get_monthly_article_stats(db: Session, year: int) -> List[dict]:"""獲取月度文章統計"""results = db.query(func.extract('month', Article.created_at).label('month'),func.count(Article.id).label('count')).filter(and_(func.extract('year', Article.created_at) == year,Article.is_published == True)).group_by(func.extract('month', Article.created_at)).all()return [{"month": int(month), "count": count}for month, count in results]

8.5 API接口層 (api.py)

from fastapi import FastAPI, Depends, HTTPException, Query
from sqlalchemy.orm import Session
from database import get_db, create_tables
from crud import UserCRUD, ArticleCRUD, StatisticsCRUD
from typing import List, Optional
import uvicorn# 創建FastAPI應用
app = FastAPI(title="博客系統API",description="基于SQLAlchemy的博客系統",version="1.0.0"
)# 啟動時創建表
@app.on_event("startup")
def startup_event():create_tables()# 用戶相關接口
@app.post("/users/", summary="創建用戶")
def create_user(username: str,email: str,password: str,full_name: Optional[str] = None,db: Session = Depends(get_db)
):"""創建新用戶"""# 檢查用戶名是否已存在existing_user = UserCRUD.get_user_by_username(db, username)if existing_user:raise HTTPException(status_code=400, detail="用戶名已存在")user = UserCRUD.create_user(db, username, email, password, full_name)return {"message": "用戶創建成功", "user_id": user.id}@app.get("/users/{user_id}", summary="獲取用戶信息")
def get_user(user_id: int, db: Session = Depends(get_db)):"""根據ID獲取用戶信息"""user = UserCRUD.get_user_by_id(db, user_id)if not user:raise HTTPException(status_code=404, detail="用戶不存在")return {"id": user.id,"username": user.username,"email": user.email,"full_name": user.full_name,"is_active": user.is_active,"created_at": user.created_at}@app.get("/users/", summary="獲取用戶列表")
def get_users(skip: int = Query(0, ge=0),limit: int = Query(20, ge=1, le=100),db: Session = Depends(get_db)
):"""獲取用戶列表"""users = UserCRUD.get_users(db, skip, limit)return {"users": [{"id": user.id,"username": user.username,"email": user.email,"full_name": user.full_name,"is_active": user.is_active}for user in users],"total": len(users)}# 文章相關接口
@app.post("/articles/", summary="創建文章")
def create_article(title: str,content: str,author_id: int,category_id: Optional[int] = None,tag_names: Optional[List[str]] = None,db: Session = Depends(get_db)
):"""創建新文章"""# 驗證作者是否存在author = UserCRUD.get_user_by_id(db, author_id)if not author:raise HTTPException(status_code=404, detail="作者不存在")article = ArticleCRUD.create_article(db, title, content, author_id, category_id, tag_names or [])return {"message": "文章創建成功", "article_id": article.id}@app.get("/articles/{article_id}", summary="獲取文章詳情")
def get_article(article_id: int, db: Session = Depends(get_db)):"""獲取文章詳情"""article = ArticleCRUD.get_article_by_id(db, article_id)if not article:raise HTTPException(status_code=404, detail="文章不存在")# 增加瀏覽量ArticleCRUD.increment_view_count(db, article_id)return {"id": article.id,"title": article.title,"content": article.content,"summary": article.summary,"is_published": article.is_published,"view_count": article.view_count,"created_at": article.created_at,"author": {"id": article.author.id,"username": article.author.username},"category": {"id": article.category.id,"name": article.category.name} if article.category else None,"tags": [{"id": tag.id, "name": tag.name}for tag in article.tags]}@app.get("/articles/", summary="獲取文章列表")
def get_articles(skip: int = Query(0, ge=0),limit: int = Query(20, ge=1, le=100),published_only: bool = Query(True),db: Session = Depends(get_db)
):"""獲取文章列表"""articles = ArticleCRUD.get_articles(db, skip, limit, published_only)return {"articles": [{"id": article.id,"title": article.title,"summary": article.summary,"view_count": article.view_count,"created_at": article.created_at,"author": article.author.username}for article in articles],"total": len(articles)}@app.get("/search/articles/", summary="搜索文章")
def search_articles(keyword: str = Query(..., min_length=1),skip: int = Query(0, ge=0),limit: int = Query(20, ge=1, le=100),db: Session = Depends(get_db)
):"""搜索文章"""articles = ArticleCRUD.search_articles(db, keyword, skip, limit)return {"keyword": keyword,"articles": [{"id": article.id,"title": article.title,"summary": article.summary,"view_count": article.view_count,"created_at": article.created_at}for article in articles],"total": len(articles)}# 統計相關接口
@app.get("/statistics/overview", summary="獲取統計概覽")
def get_statistics_overview(db: Session = Depends(get_db)):"""獲取統計概覽"""stats = StatisticsCRUD.get_article_stats(db)category_stats = StatisticsCRUD.get_category_stats(db)popular_articles = StatisticsCRUD.get_popular_articles(db, 5)return {"article_stats": stats,"category_stats": category_stats,"popular_articles": [{"id": article.id,"title": article.title,"view_count": article.view_count}for article in popular_articles]}if __name__ == "__main__":uvicorn.run(app, host="0.0.0.0", port=8000)

8.6 應用入口 (main.py)

from database import create_tables, get_db
from crud import UserCRUD, ArticleCRUD
from models import User, Article, Category, Tag
from datetime import datetimedef init_sample_data():"""初始化示例數據"""# 獲取數據庫會話db = next(get_db())try:# 創建示例用戶admin_user = UserCRUD.create_user(db, "admin", "admin@blog.com", "hashed_password", "管理員")author_user = UserCRUD.create_user(db, "author", "author@blog.com", "hashed_password", "作者")# 創建示例分類tech_category = Category(name="技術", description="技術相關文章")life_category = Category(name="生活", description="生活隨筆")db.add_all([tech_category, life_category])db.commit()# 創建示例文章article1 = ArticleCRUD.create_article(db,title="SQLAlchemy入門教程",content="這是一篇關于SQLAlchemy的詳細教程...",author_id=author_user.id,category_id=tech_category.id,tag_names=["Python", "數據庫", "ORM"])article2 = ArticleCRUD.create_article(db,title="我的編程之路",content="分享我學習編程的心得體會...",author_id=author_user.id,category_id=life_category.id,tag_names=["編程", "心得"])# 發布文章ArticleCRUD.publish_article(db, article1.id)ArticleCRUD.publish_article(db, article2.id)print("示例數據初始化完成!")except Exception as e:print(f"初始化數據失敗: {e}")db.rollback()finally:db.close()def main():"""主函數"""print("=== SQLAlchemy博客系統 ===")# 創建數據庫表print("創建數據庫表...")create_tables()# 初始化示例數據print("初始化示例數據...")init_sample_data()print("\n系統啟動完成!")print("API文檔地址: http://localhost:8000/docs")print("啟動API服務器: python api.py")if __name__ == "__main__":main()

9. 性能優化與最佳實踐

9.1 查詢優化

# 1. 使用索引
class User(Base):__tablename__ = 'users'id = Column(Integer, primary_key=True)username = Column(String(50), index=True)  # 單列索引email = Column(String(100), index=True)created_at = Column(DateTime, index=True)# 復合索引__table_args__ = (Index('idx_username_email', 'username', 'email'),Index('idx_active_created', 'is_active', 'created_at'),)# 2. 預加載關聯數據
from sqlalchemy.orm import joinedload, selectinload# 避免N+1查詢問題
with Session() as session:# 錯誤方式:會產生N+1查詢users = session.query(User).all()for user in users:print(f"{user.username}: {len(user.articles)} 篇文章")  # 每次都查詢# 正確方式:使用預加載users = session.query(User).options(selectinload(User.articles)).all()for user in users:print(f"{user.username}: {len(user.articles)} 篇文章")  # 不會額外查詢# 3. 使用批量操作
with Session() as session:# 批量插入users_data = [{'username': f'user{i}', 'email': f'user{i}@example.com'}for i in range(1000)]session.bulk_insert_mappings(User, users_data)# 批量更新session.query(User).filter(User.created_at < datetime(2023, 1, 1)).update({'is_active': False})session.commit()# 4. 使用原生SQL進行復雜查詢
from sqlalchemy import textwith Session() as session:# 復雜統計查詢使用原生SQLresult = session.execute(text("""SELECT c.name as category_name,COUNT(a.id) as article_count,AVG(a.view_count) as avg_viewsFROM categories cLEFT JOIN articles a ON c.id = a.category_idWHERE a.is_published = trueGROUP BY c.id, c.nameORDER BY article_count DESC"""))for row in result:print(f"分類: {row.category_name}, 文章數: {row.article_count}, 平均瀏覽: {row.avg_views}")

9.2 連接池優化

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool# 生產環境連接池配置
engine = create_engine(DATABASE_URL,# 連接池配置poolclass=QueuePool,pool_size=20,           # 連接池大小max_overflow=30,        # 最大溢出連接pool_timeout=60,        # 獲取連接超時pool_recycle=3600,      # 連接回收時間pool_pre_ping=True,     # 連接前檢查# 性能優化echo=False,             # 生產環境關閉SQL日志future=True,            # 使用2.0風格API# 連接參數connect_args={"charset": "utf8mb4","autocommit": False,"check_same_thread": False  # SQLite專用}
)# 監控連接池狀態
def monitor_pool_status(engine):"""監控連接池狀態"""pool = engine.poolprint(f"連接池大小: {pool.size()}")print(f"已檢出連接: {pool.checkedout()}")print(f"溢出連接: {pool.overflow()}")print(f"無效連接: {pool.invalidated()}")

9.3 緩存策略

import redis
import json
from functools import wraps
from typing import Any, Optional# Redis緩存配置
redis_client = redis.Redis(host='localhost',port=6379,db=0,decode_responses=True
)def cache_result(key_prefix: str, expire_time: int = 3600):"""結果緩存裝飾器"""def decorator(func):@wraps(func)def wrapper(*args, **kwargs):# 生成緩存鍵cache_key = f"{key_prefix}:{hash(str(args) + str(kwargs))}"# 嘗試從緩存獲取cached_result = redis_client.get(cache_key)if cached_result:return json.loads(cached_result)# 執行函數result = func(*args, **kwargs)# 存入緩存redis_client.setex(cache_key,expire_time,json.dumps(result, default=str))return resultreturn wrapperreturn decorator# 使用緩存
class CachedArticleCRUD(ArticleCRUD):"""帶緩存的文章操作"""@staticmethod@cache_result("hot_articles", 1800)  # 緩存30分鐘def get_hot_articles(db: Session, limit: int = 10) -> List[dict]:"""獲取熱門文章(帶緩存)"""articles = db.query(Article).filter(Article.is_published == True).order_by(Article.view_count.desc()).limit(limit).all()return [{"id": article.id,"title": article.title,"view_count": article.view_count}for article in articles]@staticmethoddef invalidate_article_cache(article_id: int):"""清除文章相關緩存"""patterns = [f"article:{article_id}:*","hot_articles:*","recent_articles:*"]for pattern in patterns:keys = redis_client.keys(pattern)if keys:redis_client.delete(*keys)

9.4 數據庫分頁優化

from sqlalchemy import func
from typing import Tuple, Listclass PaginationHelper:"""分頁助手類"""@staticmethoddef paginate_query(query, page: int, per_page: int) -> Tuple[List, dict]:"""查詢分頁"""# 計算總數total = query.count()# 計算分頁信息total_pages = (total + per_page - 1) // per_pagehas_prev = page > 1has_next = page < total_pages# 獲取當前頁數據items = query.offset((page - 1) * per_page).limit(per_page).all()pagination_info = {"page": page,"per_page": per_page,"total": total,"total_pages": total_pages,"has_prev": has_prev,"has_next": has_next,"prev_page": page - 1 if has_prev else None,"next_page": page + 1 if has_next else None}return items, pagination_info@staticmethoddef cursor_paginate(query, cursor_field, cursor_value=None, limit: int = 20, desc: bool = True):"""游標分頁(適合大數據量)"""if cursor_value is not None:if desc:query = query.filter(cursor_field < cursor_value)else:query = query.filter(cursor_field > cursor_value)if desc:query = query.order_by(cursor_field.desc())else:query = query.order_by(cursor_field.asc())items = query.limit(limit + 1).all()has_more = len(items) > limitif has_more:items = items[:-1]next_cursor = Noneif has_more and items:next_cursor = getattr(items[-1], cursor_field.name)return items, {"has_more": has_more,"next_cursor": next_cursor,"limit": limit}# 使用示例
with Session() as session:# 傳統分頁query = session.query(Article).filter(Article.is_published == True)articles, pagination = PaginationHelper.paginate_query(query, page=1, per_page=20)# 游標分頁(適合實時數據)articles, cursor_info = PaginationHelper.cursor_paginate(query, Article.created_at, limit=20)

10. 常見問題與解決方案

10.1 常見錯誤及解決方案

# 1. 解決"DetachedInstanceError"錯誤
from sqlalchemy.orm import make_transientwith Session() as session:user = session.query(User).first()# 會話關閉后訪問關聯對象會報錯# 解決方案1:在會話內訪問所有需要的數據
with Session() as session:user = session.query(User).options(joinedload(User.articles)).first()# 在會話內訪問關聯數據articles = user.articles# 解決方案2:使用expunge_all()或merge()
with Session() as session:user = session.query(User).first()session.expunge(user)  # 從會話中移除對象# 在新會話中重新附加
with Session() as session:user = session.merge(user)  # 重新附加到會話articles = user.articles# 2. 解決"IntegrityError"約束違反錯誤
try:with Session() as session:user = User(username="existing_user", email="test@example.com")session.add(user)session.commit()
except IntegrityError as e:print(f"約束違反: {e}")# 處理重復數據或約束違反# 3. 解決"PendingRollbackError"錯誤
try:with Session() as session:# 可能出錯的操作user = User(username=None)  # 違反非空約束session.add(user)session.commit()
except Exception as e:# 必須回滾事務session.rollback()print(f"操作失敗,已回滾: {e}")# 4. 解決"StatementError"參數錯誤
# 錯誤方式
# session.query(User).filter(User.id == None)  # 應該使用is_(None)# 正確方式
session.query(User).filter(User.id.is_(None))
session.query(User).filter(User.id.isnot(None))

10.2 性能問題診斷

import time
from sqlalchemy import event
from sqlalchemy.engine import Engine# SQL執行時間監控
@event.listens_for(Engine, "before_cursor_execute")
def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):context._query_start_time = time.time()@event.listens_for(Engine, "after_cursor_execute")
def receive_after_cursor_execute(conn, cursor, statement, parameters, context, executemany):total = time.time() - context._query_start_timeif total > 0.1:  # 記錄超過100ms的查詢print(f"慢查詢 ({total:.3f}s): {statement[:100]}...")# 查詢分析器
class QueryAnalyzer:"""查詢分析器"""def __init__(self, session):self.session = sessionself.queries = []def __enter__(self):# 開始監控event.listen(self.session.bind, "before_cursor_execute", self._before_execute)event.listen(self.session.bind, "after_cursor_execute", self._after_execute)return selfdef __exit__(self, exc_type, exc_val, exc_tb):# 停止監控event.remove(self.session.bind, "before_cursor_execute", self._before_execute)event.remove(self.session.bind, "after_cursor_execute", self._after_execute)# 輸出分析結果self.print_analysis()def _before_execute(self, conn, cursor, statement, parameters, context, executemany):context._start_time = time.time()def _after_execute(self, conn, cursor, statement, parameters, context, executemany):duration = time.time() - context._start_timeself.queries.append({'statement': statement,'duration': duration,'parameters': parameters})def print_analysis(self):"""打印分析結果"""total_queries = len(self.queries)total_time = sum(q['duration'] for q in self.queries)avg_time = total_time / total_queries if total_queries > 0 else 0print(f"\n=== 查詢分析結果 ===")print(f"總查詢數: {total_queries}")print(f"總耗時: {total_time:.3f}s")print(f"平均耗時: {avg_time:.3f}s")# 顯示最慢的查詢slow_queries = sorted(self.queries, key=lambda x: x['duration'], reverse=True)[:5]print(f"\n最慢的5個查詢:")for i, query in enumerate(slow_queries, 1):print(f"{i}. {query['duration']:.3f}s - {query['statement'][:100]}...")# 使用查詢分析器
with Session() as session:with QueryAnalyzer(session) as analyzer:# 執行需要分析的查詢users = session.query(User).all()for user in users:articles = user.articles  # 可能產生N+1查詢

10.3 數據一致性保證

from sqlalchemy import event
from sqlalchemy.orm import Session
from datetime import datetime# 自動更新時間戳
@event.listens_for(User, 'before_update')
def update_timestamp(mapper, connection, target):"""更新前自動設置更新時間"""target.updated_at = datetime.now()# 軟刪除實現
class SoftDeleteMixin:"""軟刪除混入類"""deleted_at = Column(DateTime, nullable=True)is_deleted = Column(Boolean, default=False)def soft_delete(self):"""軟刪除"""self.is_deleted = Trueself.deleted_at = datetime.now()def restore(self):"""恢復刪除"""self.is_deleted = Falseself.deleted_at = Noneclass User(Base, SoftDeleteMixin):__tablename__ = 'users'# ... 其他字段# 查詢時自動過濾已刪除記錄
@event.listens_for(Session, 'after_attach')
def auto_filter_deleted(session, instance):"""自動過濾已刪除記錄"""if hasattr(instance.__class__, 'is_deleted'):session.query(instance.__class__).filter(instance.__class__.is_deleted == False)# 數據驗證
from sqlalchemy.orm import validatesclass User(Base):__tablename__ = 'users'email = Column(String(100), nullable=False)@validates('email')def validate_email(self, key, address):"""驗證郵箱格式"""import repattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'if not re.match(pattern, address):raise ValueError("郵箱格式不正確")return address

10.4 調試技巧

# 1. 啟用SQL日志
engine = create_engine('sqlite:///example.db', echo=True)# 2. 自定義日志格式
import logging
logging.basicConfig()
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)# 3. 查看生成的SQL
from sqlalchemy.dialects import mysql, postgresql, sqlitequery = session.query(User).filter(User.is_active == True)# 查看不同數據庫的SQL
print("MySQL SQL:", query.statement.compile(dialect=mysql.dialect()))
print("PostgreSQL SQL:", query.statement.compile(dialect=postgresql.dialect()))
print("SQLite SQL:", query.statement.compile(dialect=sqlite.dialect()))# 4. 使用explain分析查詢計劃
def explain_query(session, query):"""分析查詢執行計劃"""sql = str(query.statement.compile(compile_kwargs={"literal_binds": True}))explain_sql = f"EXPLAIN QUERY PLAN {sql}"result = session.execute(explain_sql)print("查詢執行計劃:")for row in result:print(row)# 使用示例
with Session() as session:query = session.query(User).filter(User.is_active == True)explain_query(session, query)

總結

本教程全面介紹了SQLAlchemy的核心概念、基本用法和高級特性。通過學習本教程,你應該能夠:

  1. 理解SQLAlchemy架構:掌握ORM層、Core層、Engine層的作用和關系
  2. 熟練使用基本功能:數據模型定義、CRUD操作、查詢語法
  3. 掌握高級特性:復雜查詢、關系映射、事務管理、性能優化
  4. 應用最佳實踐:連接池配置、緩存策略、錯誤處理、調試技巧
  5. 構建實際項目:通過博客系統示例了解完整的開發流程

學習建議

  1. 循序漸進:從基礎概念開始,逐步深入高級特性
  2. 動手實踐:運行示例代碼,修改參數觀察結果
  3. 閱讀文檔:結合官方文檔深入理解細節
  4. 項目實戰:在實際項目中應用所學知識
  5. 持續學習:關注SQLAlchemy更新,學習新特性

進階方向

  • 異步編程:學習SQLAlchemy的異步支持
  • 微服務架構:在分布式系統中使用SQLAlchemy
  • 數據庫優化:深入學習數據庫性能調優
  • 框架集成:與Flask、FastAPI、Django等框架集成
  • 數據遷移:使用Alembic進行數據庫版本管理

希望這份教程能夠幫助你掌握SQLAlchemy,在Python數據庫開發中游刃有余!

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/pingmian/96085.shtml
繁體地址,請注明出處:http://hk.pswp.cn/pingmian/96085.shtml
英文地址,請注明出處:http://en.pswp.cn/pingmian/96085.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

springboot rabbitmq 延時隊列消息確認收貨訂單已完成

供應商后臺-點擊發貨-默認3天自動收貨確認&#xff0c;更新訂單狀態已完成。1 pom.xml 引入依賴&#xff1a;<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>2 …

Linux內核TCP輸出引擎:深入解析數據傳輸的核心機制

引言 傳輸控制協議(TCP)作為互聯網最重要的基礎協議之一,其實現質量直接關系到網絡性能和應用體驗。在Linux內核中,TCP協議的輸出引擎是實現可靠數據傳輸的核心組件,負責將應用層數據高效、可靠地傳輸到網絡對端。本文將深入分析Linux內核中TCP輸出引擎的關鍵機制和實現原…

數據倉庫詳解

數據倉庫詳解第一節 數據倉庫構建方法論和實踐一、數據倉庫與數據庫的區別二、數據倉庫對于企業的價值三、數據倉庫的模型構建1、數據倉庫構建需要考慮的問題2、什么是數倉的數據模型3、如何構建數倉的數據模型&#xff08;1&#xff09;概念模型設計&#xff08;2&#xff09;…

單身杯1(web)

web簽到<?php# -*- coding: utf-8 -*- # Author: h1xa # Date: 2022-03-19 12:10:55 # Last Modified by: h1xa # Last Modified time: 2022-03-19 13:27:18 # email: h1xactfer.com # link: https://ctfer.comerror_reporting(0); highlight_file(__FILE__);$file $_…

RNN/LSTM/GRU/Transformer

RNN的局限1&#xff1a;長期依賴&#xff08;Long-TermDependencies&#xff09;問題但是同樣會有一些更加復雜的場景。比如我們試著去預測“I grew up in France...I speak fluent French”最后的詞“French”。當前的信息建議下一個詞可能是一種語言的名字&#xff0c;但是如…

瀏覽器開發CEFSharp+X86 (十六)網頁讀取電子秤數據——仙盟創夢IDE

一、東方仙盟智能瀏覽器&#xff1a;跨平臺&#xff0c;暢連百種硬件&#xff0c;速啟現場編譯東方仙盟 VOS 智能瀏覽器在網頁調用硬件 SDK 領域堪稱卓越典范。它全面兼容多平臺&#xff0c;無論是電腦、手機還是各類移動終端&#xff0c;都能完美適配&#xff0c;無縫對接。令…

騰訊云EdgeOne免費套餐:零成本開啟網站加速與安全防護

騰訊云EdgeOne免費套餐&#xff1a;零成本開啟網站加速與安全防護 ?一鍵解鎖全球3200節點&#xff0c;讓網站速度提升53%&#xff0c;同時獲得企業級安全防護作為一名站長或個人開發者&#xff0c;你是否曾為網站加載速度緩慢而苦惱&#xff1f;是否擔心網站遭遇DDoS攻擊或惡意…

服務器數據恢復—Raid6陣列崩潰導致上層分區無法訪問的數據恢復案例

服務器存儲數據恢復環境&#xff1a; 一臺infortrend某型號存儲&#xff0c;存儲設備上有12塊硬盤&#xff0c;組建一組raid6磁盤陣列。陣列上層有一個lun&#xff0c;映射到WINDOWS系統上使用。WINDOWS系統劃分了一個GUID Partition Table分區。服務器存儲故障&#xff1a; 存…

【生產故事會】Kafka 生產環境參數優化實戰案例

Kafka 3.9.1 生產環境參數優化實戰案例(8核32G HDD場景) 一、背景與硬件/業務配置 某企業級全鏈路日志采集平臺需構建高穩定Kafka集群,承擔核心業務日志流轉(涵蓋用戶行為、系統監控、交易鏈路日志),單集群3節點部署,硬件與業務特征如下: 維度 具體配置 硬件配置 C…

推薦 Eclipse Temurin 的 OpenJDK

推薦 Eclipse Temurin 的 OpenJDK 發行版 https://adoptium.net/zh-CN/temurin/releases&#xff0c;是基于其在技術可靠性、生態中立性、許可友好性和社區支持等多個維度的綜合優勢。 以下是詳細的原因&#xff0c;解釋了為什么 Eclipse Temurin 通常是基于 OpenJDK 構建的 J…

分布式3PC理論

目錄 為什么需要 3PC&#xff1f; 核心結論 3PC的優缺點 3PC與 Paxos / Raft 對比 本篇文章內容的前置知識為 分布式2PC理論&#xff0c;如果不了解&#xff0c;可點擊鏈接學習 分布式2PC理論-CSDN博客 為什么需要 3PC&#xff1f; 1) 2PC 的根本問題&#xff1a;阻塞 不…

Web 前端可視化開發工具對比 低代碼平臺、可視化搭建工具、前端可視化編輯器與在線可視化開發環境的實戰分析

在前端開發領域&#xff0c;“可視化”已經成為提升效率和降低門檻的重要方向。從 低代碼平臺 到 前端可視化編輯器&#xff0c;再到 在線可視化開發環境&#xff0c;這些工具都在改變前端的開發方式。 本文將結合真實項目&#xff0c;分析常見的 Web 前端可視化開發工具&#…

單例模式(C++)(錯誤日志實現)

單例模式一、核心原理二、常見的單例模式實現方式1. 懶漢式&#xff08;Lazy Initialization&#xff09;2. 餓漢式&#xff08;Eager Initialization&#xff09;三、關鍵實現細節解析四、單例模式的適用場景與特點使用場景日志工具&#xff08;確保日志寫入的唯一性&#xff…

stm32 鏈接腳本沒有 .gcc_except_table 段也能支持 C++ 異常

stm32 使用 cubemx 生成的 gnu ld 鏈接腳本沒有 .gcc_except_table 段。如下所示 /* ****************************************************************************** ** ** file : LinkerScript.ld ** ** author : Auto-generated by STM32CubeIDE ** ** Abst…

SpringBoot改造MCP服務器(StreamableHTTP)

項目地址&#xff1a; https://gitee.com/kylewka/smart-ai 1 項目說明 MCP&#xff08;Model Context Protocol&#xff09;協議是一個用于 AI 模型和工具之間通信的標準協議。隨著 AI 應用變得越來越復雜并被廣泛部署&#xff0c;原有的通信機制面臨著一系列挑戰。 近期 MCP …

【數學建模】煙幕干擾彈投放策略優化:模型與算法整合框架

煙幕干擾彈投放策略優化&#xff1a;模型與算法整合框架 基于文獻研究和問題需求分析&#xff0c;我們構建了完整的模型與算法整合框架。 一、整體建模框架 1. 核心問題分解 物理層&#xff1a;煙幕彈道運動與擴散特性建模博弈層&#xff1a;導彈識別與決策機制建模優化層&…

結合大數據知識體系對倉庫建模方法總結

傳統的倉庫建模理論&#xff08;如維度建模&#xff09;仍然是基石&#xff0c;但大數據的“4V”特性&#xff08;Volume, Velocity, Variety, Value&#xff09;要求我們對這些方法進行演進和補充。 以下是結合大數據知識體系對倉庫建模方法的總結&#xff1a;一、核心目標&am…

C 語言第一課:hello word c

C 語言第一課&#xff1a;hello word c開發工具創建項目快速學習平臺開發工具 個人推薦使用 jetBrains 公司的 CLion 開發工具下載地址 https://www.jetbrains.com/clion/ 創建項目 編寫代碼 //頭文件 #include <stdio.h>//程序入口 int main(){printf("hello w…

基于Java Spring Boot的云原生TodoList Demo 項目,驗證云原生核心特性

以下是一個基于 Java Spring Boot 的云原生 TodoList Demo 項目&#xff0c;涵蓋 容器化、Kubernetes 編排、CI/CD、可觀測性、彈性擴縮容 等核心云原生特性&#xff0c;代碼簡潔且附詳細操作指南&#xff0c;適合入門學習。項目概覽 目標&#xff1a;實現一個支持增刪改查&…

開源一個輕量級 Go 工具庫:go-commons

項目背景 在日常 Go 開發中&#xff0c;我們經常需要處理字符串操作和系統監控相關的功能。雖然 Go 標準庫提供了基礎的字符串處理能力&#xff0c;但在實際項目中&#xff0c;我們往往需要一些更便捷的工具函數來提高開發效率。 基于"盡可能不使用第三方依賴"的原…