FastAPI 數據庫配置最佳實踐
1. 基礎配置
1.1 數據庫連接配置
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.pool import QueuePool
from sqlalchemy.exc import SQLAlchemyError
import logging# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)# 數據庫URL配置
DATABASE_URL = "mysql+pymysql://user:password@localhost:3306/dbname"# 連接池配置
POOL_SIZE = 5 # 默認連接池大小
MAX_OVERFLOW = 10 # 最大溢出連接數
POOL_TIMEOUT = 30 # 連接池獲取超時時間
POOL_RECYCLE = 1800 # 連接回收時間(秒)# 創建數據庫引擎
engine = create_engine(DATABASE_URL,poolclass=QueuePool,pool_size=POOL_SIZE,max_overflow=MAX_OVERFLOW,pool_timeout=POOL_TIMEOUT,pool_recycle=POOL_RECYCLE,pool_pre_ping=True, # 自動檢測失效連接echo=False # 是否打印SQL語句
)
1.2 會話管理
# 創建會話工廠
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)# 聲明基類
Base = declarative_base()
2. 連接池配置說明
2.1 參數解釋
-
pool_size
: 連接池保持的連接數- 建議值: (CPU核心數 × 2) + 有效磁盤數
- 默認值: 5
- 說明: 不是越大越好,需要根據實際負載調整
-
max_overflow
: 超過 pool_size 后的最大允許連接數- 建議值: pool_size 的 2 倍以內
- 作用: 處理突發流量
-
pool_timeout
: 從連接池獲取連接的超時時間- 單位: 秒
- 建議值: 30-60秒
- 說明: 防止連接池耗盡時無限等待
-
pool_recycle
: 連接重用的最大時間- 單位: 秒
- 建議值: 1800秒(30分鐘)
- 作用: 防止數據庫斷開空閑連接
2.2 連接池事件監聽
@event.listens_for(engine, "connect")
def connect(dbapi_connection, connection_record):logger.debug("New database connection established")@event.listens_for(engine, "checkout")
def checkout(dbapi_connection, connection_record, connection_proxy):logger.debug("Database connection checked out from pool")@event.listens_for(engine, "checkin")
def checkin(dbapi_connection, connection_record):logger.debug("Database connection returned to pool")
3. FastAPI 依賴注入
3.1 數據庫會話依賴
from typing import Generator
from fastapi import Dependsdef get_db() -> Generator:"""FastAPI 依賴函數,用于獲取數據庫會話使用方法:@app.get("/users/")def read_users(db: Session = Depends(get_db)):..."""db = SessionLocal()try:yield dbdb.commit() # 如果沒有異常發生,提交事務except SQLAlchemyError as e:logger.error(f"Database error occurred: {str(e)}")db.rollback() # 發生異常時回滾raise # 重新拋出異常,讓 FastAPI 的異常處理器處理except Exception as e:logger.error(f"Unexpected error occurred: {str(e)}")db.rollback()raisefinally:db.close() # 確保關閉會話
3.2 使用示例
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Sessionapp = FastAPI()@app.get("/users/")
def read_users(db: Session = Depends(get_db)):users = db.query(User).all()return users@app.post("/users/")
def create_user(user: UserCreate, db: Session = Depends(get_db)):db_user = User(**user.dict())db.add(db_user)db.commit()db.refresh(db_user)return db_user
4. 健康檢查
4.1 數據庫連接檢查
def verify_db_connection() -> bool:"""驗證數據庫連接是否正常可以在應用啟動時或者健康檢查時使用"""try:db = SessionLocal()db.execute("SELECT 1")return Trueexcept Exception as e:logger.error(f"Database connection verification failed: {str(e)}")return Falsefinally:db.close()@app.on_event("startup")
async def startup():"""應用啟動時驗證數據庫連接"""if not verify_db_connection():raise Exception("Database connection failed")
5. 最佳實踐建議
-
連接池配置
- 根據實際負載調整 pool_size
- 設置合理的 pool_recycle 防止連接失效
- 啟用 pool_pre_ping 自動檢測連接狀態
-
事務管理
- 使用 context manager 管理事務
- 確保正確的異常處理和回滾
- 避免長事務
-
性能優化
- 合理使用 autoflush
- 適當設置 pool_timeout
- 監控連接池使用情況
-
安全性
- 使用環境變量管理數據庫憑證
- 限制數據庫用戶權限
- 定期更新數據庫密碼
-
監控和日志
- 記錄關鍵數據庫操作
- 監控連接池狀態
- 設置合適的日志級別
6. 常見問題處理
-
連接池耗盡
# 解決方案:調整連接池配置 engine = create_engine(DATABASE_URL,pool_size=10, # 增加連接池大小max_overflow=20, # 增加最大溢出連接數pool_timeout=60 # 增加超時時間 )
-
連接超時
# 解決方案:添加重試機制 from tenacity import retry, stop_after_attempt, wait_exponential@retry(stop=stop_after_attempt(3),wait=wait_exponential(multiplier=1, min=4, max=10) ) def get_db_with_retry():return get_db()
-
死鎖處理
# 解決方案:設置事務超時 from sqlalchemy import event@event.listens_for(engine, "begin") def set_transaction_timeout(conn):conn.execute("SET innodb_lock_wait_timeout = 15")
7. 總結
良好的數據庫配置對于 FastAPI 應用的性能和可靠性至關重要。通過合理配置連接池、正確處理事務、實施監控和日志記錄,可以構建一個健壯的數據庫訪問層。要根據實際應用場景和負載情況,不斷調整和優化配置參數。
8. 不使用依賴注入的數據庫使用方式
8.1 使用上下文管理器
from contextlib import contextmanager@contextmanager
def get_db_context():"""使用上下文管理器獲取數據庫會話使用方式:with get_db_context() as db:result = db.query(User).all()"""db = SessionLocal()try:yield dbdb.commit()except Exception as e:logger.error(f"Database error: {str(e)}")db.rollback()raisefinally:db.close()# 使用示例
def get_user_by_id(user_id: int):with get_db_context() as db:return db.query(User).filter(User.id == user_id).first()def create_new_user(user_data: dict):with get_db_context() as db:user = User(**user_data)db.add(user)db.commit()db.refresh(user)return user
8.2 使用裝飾器
from functools import wrapsdef with_db(func):"""數據庫會話裝飾器使用方式:@with_dbdef my_function(db, *args, **kwargs):result = db.query(User).all()"""@wraps(func)def wrapper(*args, **kwargs):db = SessionLocal()try:kwargs['db'] = dbresult = func(*args, **kwargs)db.commit()return resultexcept Exception as e:logger.error(f"Database error: {str(e)}")db.rollback()raisefinally:db.close()return wrapper# 使用示例
@with_db
def get_user_list(db, skip: int = 0, limit: int = 100):return db.query(User).offset(skip).limit(limit).all()@with_db
def update_user(db, user_id: int, user_data: dict):user = db.query(User).filter(User.id == user_id).first()for key, value in user_data.items():setattr(user, key, value)db.commit()return user
8.3 直接使用會話
def get_db_session():"""獲取數據庫會話的簡單函數警告:使用此方法時需要手動管理會話的生命周期"""return SessionLocal()# 使用示例
def perform_complex_operation():db = get_db_session()try:# 執行多個數據庫操作result1 = db.query(User).all()result2 = db.query(Order).all()# 一些業務邏輯處理for user in result1:user.last_login = datetime.now()# 批量更新db.bulk_save_objects(result1)db.commit()return {"users": result1, "orders": result2}except Exception as e:logger.error(f"Error in complex operation: {str(e)}")db.rollback()raisefinally:db.close()
8.4 使用建議
-
選擇合適的方式
- 簡單操作:使用上下文管理器(
with get_db_context()
) - 重復性操作:使用裝飾器(
@with_db
) - 復雜事務:直接使用會話
- 簡單操作:使用上下文管理器(
-
注意事項
- 確保在所有情況下都正確關閉數據庫會話
- 正確處理事務(提交/回滾)
- 適當的錯誤處理和日志記錄
-
性能考慮
- 避免頻繁創建和關閉會話
- 合理使用批量操作
- 注意事務范圍的控制
-
最佳實踐
- 在同一個事務中盡量減少數據庫操作次數
- 使用適當的索引優化查詢性能
- 定期監控數據庫連接使用情況
7. 總結
良好的數據庫配置對于 FastAPI 應用的性能和可靠性至關重要。通過合理配置連接池、正確處理事務、實施監控和日志記錄,可以構建一個健壯的數據庫訪問層。要根據實際應用場景和負載情況,不斷調整和優化配置參數。無論是使用依賴注入還是直接使用數據庫會話,都要確保正確管理數據庫資源,處理異常情況,并遵循最佳實踐。
9. Galera Cluster 高級配置
9.1 連接配置
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker
import random# Galera Cluster 節點配置
GALERA_NODES = ["mysql+pymysql://user:pass@node1:3306/dbname","mysql+pymysql://user:pass@node2:3306/dbname","mysql+pymysql://user:pass@node3:3306/dbname"
]def get_galera_engine(mode='random'):"""獲取 Galera Cluster 數據庫引擎mode: - random: 隨機選擇節點(適合讀操作)- primary: 使用主節點(適合寫操作)- all: 返回所有節點的引擎(特殊場景使用)"""if mode == 'random':# 隨機選擇一個節點用于讀操作db_url = random.choice(GALERA_NODES)elif mode == 'primary':# 使用主節點用于寫操作db_url = GALERA_NODES[0]else:# 返回所有節點的引擎return [create_engine(url, **DB_KWARGS) for url in GALERA_NODES]return create_engine(db_url, **DB_KWARGS)# 針對 Galera 的連接池配置
DB_KWARGS = {"pool_size": 5,"max_overflow": 10,"pool_timeout": 30,"pool_recycle": 1800,"pool_pre_ping": True,# Galera 特定配置"connect_args": {"connect_timeout": 10,"read_timeout": 30,"write_timeout": 30,"charset": "utf8mb4",# 設置事務隔離級別"init_command": "SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED"}
}
9.2 讀寫分離實現
class GaleraSessionManager:"""Galera Cluster 會話管理器,實現讀寫分離"""def __init__(self):self.write_engine = get_galera_engine(mode='primary')self.read_engines = [get_galera_engine(mode='random') for _ in range(2)]self.WriteSessionLocal = sessionmaker(bind=self.write_engine)self.ReadSessionLocal = sessionmaker(bind=random.choice(self.read_engines))@contextmanagerdef get_read_db(self):"""獲取讀會話"""db = self.ReadSessionLocal()try:yield dbfinally:db.close()@contextmanagerdef get_write_db(self):"""獲取寫會話"""db = self.WriteSessionLocal()try:yield dbdb.commit()except Exception:db.rollback()raisefinally:db.close()# 使用示例
galera_manager = GaleraSessionManager()def read_user_profile(user_id: int):with galera_manager.get_read_db() as db:return db.query(User).filter(User.id == user_id).first()def update_user_profile(user_id: int, data: dict):with galera_manager.get_write_db() as db:user = db.query(User).filter(User.id == user_id).first()for key, value in data.items():setattr(user, key, value)
9.3 事務和一致性處理
from contextlib import contextmanager
from sqlalchemy.orm import Session
import timeclass GaleraTransaction:"""Galera 集群事務管理器"""MAX_RETRIES = 3RETRY_DELAY = 1 # 秒@staticmethod@contextmanagerdef atomic(db: Session):"""原子事務處理器,處理 Galera 特有的死鎖和沖突"""retry_count = 0while True:try:yieldbreakexcept Exception as e:if retry_count >= GaleraTransaction.MAX_RETRIES:raiseif "deadlock" in str(e).lower() or "lock wait timeout" in str(e).lower():retry_count += 1time.sleep(GaleraTransaction.RETRY_DELAY)continueraise# 使用示例
def process_order(order_id: int):with galera_manager.get_write_db() as db:with GaleraTransaction.atomic(db):# 更新訂單狀態order = db.query(Order).filter(Order.id == order_id).first()order.status = 'processing'# 更新庫存inventory = db.query(Inventory).filter(Inventory.product_id == order.product_id).first()inventory.quantity -= order.quantity
9.4 高可用性配置
class GaleraHighAvailability:"""Galera 集群高可用性管理"""def __init__(self):self.nodes = GALERA_NODESself.active_nodes = set()self.check_interval = 30 # 秒async def monitor_nodes(self):"""監控節點狀態"""while True:for node in self.nodes:try:engine = create_engine(node, **DB_KWARGS)with engine.connect() as conn:# 檢查節點狀態result = conn.execute("SHOW STATUS LIKE 'wsrep_local_state_comment'")status = result.fetchone()[1]if status == 'Synced':self.active_nodes.add(node)else:self.active_nodes.discard(node)except Exception:self.active_nodes.discard(node)await asyncio.sleep(self.check_interval)def get_available_node(self):"""獲取可用節點"""if not self.active_nodes:raise Exception("No available Galera nodes")return random.choice(list(self.active_nodes))# 使用示例
ha_manager = GaleraHighAvailability()@app.on_event("startup")
async def startup_event():# 啟動節點監控asyncio.create_task(ha_manager.monitor_nodes())def get_db_connection():"""獲取高可用數據庫連接"""node = ha_manager.get_available_node()engine = create_engine(node, **DB_KWARGS)return sessionmaker(bind=engine)()
9.5 性能優化建議
-
讀寫分離策略
- 讀操作使用隨機節點
- 寫操作固定使用主節點
- 批量操作考慮負載均衡
-
事務處理
- 使用 READ COMMITTED 隔離級別
- 實現死鎖重試機制
- 控制事務大小和持續時間
-
連接池優化
- 為不同節點維護獨立的連接池
- 根據節點角色調整連接池大小
- 定期檢查并清理空閑連接
-
監控和維護
- 實時監控節點狀態
- 記錄同步延遲
- 實現故障自動轉移
-
查詢優化
- 使用合適的索引
- 避免長事務
- 控制并發寫入操作
7. 總結
良好的數據庫配置對于 FastAPI 應用的性能和可靠性至關重要。通過合理配置連接池、正確處理事務、實施監控和日志記錄,可以構建一個健壯的數據庫訪問層。要根據實際應用場景和負載情況,不斷調整和優化配置參數。無論是使用依賴注入還是直接使用數據庫會話,都要確保正確管理數據庫資源,處理異常情況,并遵循最佳實踐。
10. 完整的 CRUD 示例
10.1 模型定義
from sqlalchemy import Column, Integer, String, DateTime, Boolean
from sqlalchemy.sql import func
from pydantic import BaseModel, EmailStr
from typing import Optional# SQLAlchemy 模型
class User(Base):__tablename__ = "users"id = Column(Integer, primary_key=True, index=True)email = Column(String(255), unique=True, index=True, nullable=False)username = Column(String(50), unique=True, index=True)hashed_password = Column(String(255), nullable=False)is_active = Column(Boolean, default=True)created_at = Column(DateTime(timezone=True), server_default=func.now())updated_at = Column(DateTime(timezone=True), onupdate=func.now())# Pydantic 模型
class UserBase(BaseModel):email: EmailStrusername: strclass UserCreate(UserBase):password: strclass UserUpdate(BaseModel):email: Optional[EmailStr] = Noneusername: Optional[str] = Noneis_active: Optional[bool] = Noneclass UserResponse(UserBase):id: intis_active: boolcreated_at: datetimeupdated_at: Optional[datetime]class Config:orm_mode = True
10.2 服務層實現
from passlib.context import CryptContext
from typing import List, Optionalpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")class UserService:def __init__(self, galera_manager: GaleraSessionManager):self.db_manager = galera_managerdef get_user(self, user_id: int) -> Optional[User]:"""讀取單個用戶"""with self.db_manager.get_read_db() as db:return db.query(User).filter(User.id == user_id).first()def get_user_by_email(self, email: str) -> Optional[User]:"""通過郵箱讀取用戶"""with self.db_manager.get_read_db() as db:return db.query(User).filter(User.email == email).first()def get_users(self, skip: int = 0, limit: int = 100) -> List[User]:"""讀取用戶列表"""with self.db_manager.get_read_db() as db:return db.query(User).offset(skip).limit(limit).all()def create_user(self, user: UserCreate) -> User:"""創建新用戶"""with self.db_manager.get_write_db() as db:with GaleraTransaction.atomic(db):# 檢查郵箱是否已存在if self.get_user_by_email(user.email):raise ValueError("Email already registered")# 創建用戶hashed_password = pwd_context.hash(user.password)db_user = User(email=user.email,username=user.username,hashed_password=hashed_password)db.add(db_user)db.flush() # 獲取自增IDdb.refresh(db_user)return db_userdef update_user(self, user_id: int, user_update: UserUpdate) -> Optional[User]:"""更新用戶信息"""with self.db_manager.get_write_db() as db:with GaleraTransaction.atomic(db):db_user = db.query(User).filter(User.id == user_id).first()if not db_user:return None# 更新非空字段update_data = user_update.dict(exclude_unset=True)for key, value in update_data.items():setattr(db_user, key, value)db.flush()db.refresh(db_user)return db_userdef delete_user(self, user_id: int) -> bool:"""刪除用戶"""with self.db_manager.get_write_db() as db:with GaleraTransaction.atomic(db):db_user = db.query(User).filter(User.id == user_id).first()if not db_user:return Falsedb.delete(db_user)return True
10.3 API 路由實現
from fastapi import APIRouter, HTTPException, status, Depends
from typing import Listrouter = APIRouter(prefix="/users", tags=["users"])# 初始化服務
galera_manager = GaleraSessionManager()
user_service = UserService(galera_manager)@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(user: UserCreate):try:return user_service.create_user(user)except ValueError as e:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,detail=str(e))except Exception as e:raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,detail="Failed to create user")@router.get("/{user_id}", response_model=UserResponse)
async def read_user(user_id: int):user = user_service.get_user(user_id)if user is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,detail="User not found")return user@router.get("/", response_model=List[UserResponse])
async def read_users(skip: int = 0, limit: int = 100):return user_service.get_users(skip=skip, limit=limit)@router.put("/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user_update: UserUpdate):user = user_service.update_user(user_id, user_update)if user is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,detail="User not found")return user@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: int):success = user_service.delete_user(user_id)if not success:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,detail="User not found")
10.4 使用示例
# 1. 創建用戶
curl -X POST "http://localhost:8000/users/" \-H "Content-Type: application/json" \-d '{"email": "user@example.com", "username": "testuser", "password": "secret123"}'# 2. 獲取用戶列表
curl "http://localhost:8000/users/?skip=0&limit=10"# 3. 獲取單個用戶
curl "http://localhost:8000/users/1"# 4. 更新用戶
curl -X PUT "http://localhost:8000/users/1" \-H "Content-Type: application/json" \-d '{"username": "newusername"}'# 5. 刪除用戶
curl -X DELETE "http://localhost:8000/users/1"
10.5 關鍵特性說明
-
讀寫分離
- 讀操作(get_user, get_users)使用讀節點
- 寫操作(create_user, update_user, delete_user)使用寫節點
-
事務處理
- 所有寫操作都在 GaleraTransaction.atomic 中執行
- 自動處理死鎖和重試
-
異常處理
- 業務邏輯異常(如郵箱已存在)
- 數據庫異常
- HTTP 響應異常
-
性能優化
- 使用索引(email, username)
- 分頁查詢
- 選擇性更新字段
-
數據驗證
- 使用 Pydantic 模型驗證輸入數據
- 明確的響應模型定義
7. 總結
良好的數據庫配置對于 FastAPI 應用的性能和可靠性至關重要。通過合理配置連接池、正確處理事務、實施監控和日志記錄,可以構建一個健壯的數據庫訪問層。要根據實際應用場景和負載情況,不斷調整和優化配置參數。無論是使用依賴注入還是直接使用數據庫會話,都要確保正確管理數據庫資源,處理異常情況,并遵循最佳實踐。
11. 高并發和巨量數據處理
11.1 高并發優化
11.1.1 連接池配置
from sqlalchemy import create_engine
import multiprocessing# 計算最優連接池大小
CPU_COUNT = multiprocessing.cpu_count()
POOL_SIZE = CPU_COUNT * 2 + 1 # 基礎連接數
MAX_OVERFLOW = POOL_SIZE * 2 # 最大溢出連接數# 高并發場景的連接池配置
engine = create_engine(DATABASE_URL,poolclass=QueuePool,pool_size=POOL_SIZE,max_overflow=MAX_OVERFLOW,pool_timeout=30,pool_recycle=1800,pool_pre_ping=True,# 設置 echo_pool 來監控連接池狀態echo_pool=True
)# 連接池監控
@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_connection, connection_record, connection_proxy):logger.info(f"Connection pool size: {engine.pool.size()}")logger.info(f"Current checked out connections: {engine.pool.checkedin()}")
11.1.2 異步查詢處理
from fastapi import FastAPI, BackgroundTasks
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.future import select# 異步數據庫引擎
async_engine = create_async_engine("mysql+asyncmy://user:pass@localhost/db",pool_size=20,max_overflow=30
)async def get_async_session():async with AsyncSession(async_engine) as session:yield session# 異步查詢示例
async def async_get_users(skip: int = 0, limit: int = 100):async with AsyncSession(async_engine) as session:query = select(User).offset(skip).limit(limit)result = await session.execute(query)return result.scalars().all()# API 實現
@router.get("/users/async")
async def get_users_async(background_tasks: BackgroundTasks,skip: int = 0, limit: int = 100
):# 將耗時操作放入后臺任務background_tasks.add_task(process_user_data)return await async_get_users(skip, limit)
11.1.3 并發控制
from fastapi import HTTPException
from asyncio import Semaphore
import asyncio# 限制并發請求數
MAX_CONCURRENT_REQUESTS = 100
semaphore = Semaphore(MAX_CONCURRENT_REQUESTS)async def controlled_operation():async with semaphore:# 執行數據庫操作pass# 使用 Redis 實現分布式鎖
from redis import Redis
from redis.lock import Lockredis_client = Redis(host='localhost', port=6379, db=0)def with_distributed_lock(lock_name: str, timeout: int = 10):def decorator(func):async def wrapper(*args, **kwargs):lock = Lock(redis_client, lock_name, timeout=timeout)if not lock.acquire(blocking=True, blocking_timeout=5):raise HTTPException(status_code=423,detail="Resource is locked")try:return await func(*args, **kwargs)finally:lock.release()return wrapperreturn decorator# 使用示例
@router.post("/users/batch")
@with_distributed_lock("create_users_lock")
async def create_users_batch(users: List[UserCreate]):return await batch_create_users(users)
11.2 巨量數據處理
11.2.1 分批處理
from typing import List, Generator
import pandas as pdclass BatchProcessor:def __init__(self, batch_size: int = 1000):self.batch_size = batch_sizeself.galera_manager = GaleraSessionManager()def process_large_dataset(self, data: List[dict]):"""分批處理大數據集"""for batch in self._get_batches(data):with self.galera_manager.get_write_db() as db:with GaleraTransaction.atomic(db):self._process_batch(db, batch)def _get_batches(self, data: List[dict]) -> Generator[List[dict], None, None]:"""將數據集分割成小批次"""for i in range(0, len(data), self.batch_size):yield data[i:i + self.batch_size]def _process_batch(self, db: Session, batch: List[dict]):"""處理單個批次的數據"""try:# 使用 bulk_insert_mappings 進行批量插入db.bulk_insert_mappings(User, batch)db.flush()except Exception as e:logger.error(f"Error processing batch: {str(e)}")raise# 使用示例
@router.post("/users/bulk")
async def bulk_create_users(background_tasks: BackgroundTasks,users: List[UserCreate]
):processor = BatchProcessor(batch_size=1000)# 將大量數據處理放入后臺任務background_tasks.add_task(processor.process_large_dataset, users)return {"message": "Bulk processing started"}
11.2.2 流式處理
from fastapi.responses import StreamingResponse
import csv
import ioclass StreamProcessor:CHUNK_SIZE = 1000@staticmethodasync def stream_large_query(query):"""流式處理大型查詢結果"""with SessionLocal() as db:result_proxy = db.execute(query)while True:chunk = result_proxy.fetchmany(StreamProcessor.CHUNK_SIZE)if not chunk:breakyield chunk@staticmethodasync def export_large_dataset():"""導出大型數據集"""output = io.StringIO()writer = csv.writer(output)# 寫入表頭writer.writerow(["id", "email", "username", "created_at"])query = "SELECT id, email, username, created_at FROM users"async for chunk in StreamProcessor.stream_large_query(query):for row in chunk:writer.writerow(row)output.seek(0)data = output.read()output.seek(0)output.truncate(0)yield data# API 實現
@router.get("/users/export")
async def export_users():return StreamingResponse(StreamProcessor.export_large_dataset(),media_type="text/csv",headers={"Content-Disposition": "attachment; filename=users.csv"})
11.2.3 分區表處理
from sqlalchemy import Table, Column, Integer, String, DateTime
from sqlalchemy.schema import CreateTable
import datetimedef create_partition_table(year: int, month: int):"""創建分區表"""partition_name = f"users_{year}_{month:02d}"# 創建分區表partition = Table(partition_name,Base.metadata,Column('id', Integer, primary_key=True),Column('email', String(255)),Column('username', String(50)),Column('created_at', DateTime),postgresql_partition_by='RANGE (created_at)',)# 設置分區范圍start_date = datetime.datetime(year, month, 1)if month == 12:end_date = datetime.datetime(year + 1, 1, 1)else:end_date = datetime.datetime(year, month + 1, 1)partition_sql = f"""CREATE TABLE {partition_name}PARTITION OF usersFOR VALUES FROM ('{start_date}') TO ('{end_date}')"""return partition_sqlclass PartitionManager:def __init__(self):self.engine = create_engine(DATABASE_URL)def ensure_partition_exists(self, date: datetime.datetime):"""確保指定日期的分區存在"""year = date.yearmonth = date.monthpartition_name = f"users_{year}_{month:02d}"# 檢查分區是否存在exists_query = f"""SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = '{partition_name}')"""with self.engine.connect() as conn:exists = conn.execute(exists_query).scalar()if not exists:# 創建新分區partition_sql = create_partition_table(year, month)conn.execute(partition_sql)# 使用示例
@router.post("/users/partitioned")
async def create_user_partitioned(user: UserCreate):partition_manager = PartitionManager()# 確保當前月份的分區存在partition_manager.ensure_partition_exists(datetime.datetime.now())# 創建用戶with galera_manager.get_write_db() as db:with GaleraTransaction.atomic(db):db_user = User(**user.dict())db.add(db_user)return db_user
11.2.4 查詢優化
from sqlalchemy import text
from sqlalchemy.orm import joinedload, selectinloadclass QueryOptimizer:def __init__(self, db: Session):self.db = dbdef optimize_large_table_query(self):"""優化大表查詢"""# 1. 使用索引提示query = text("""SELECT /*+ INDEX(users email_idx) */FROM usersWHERE email LIKE :pattern""")# 2. 使用延遲加載query = self.db.query(User).options(selectinload(User.orders))# 3. 使用覆蓋索引query = self.db.query(User.id, User.email, User.username).filter(User.is_active == True)# 4. 分頁優化query = self.db.query(User).filter(User.id > last_id).order_by(User.id).limit(100)return querydef analyze_query(self, query):"""分析查詢性能"""explain_sql = f"EXPLAIN ANALYZE {query}"return self.db.execute(explain_sql)# 使用示例
@router.get("/users/optimized")
async def get_users_optimized(db: Session = Depends(get_db),last_id: int = 0,limit: int = 100
):optimizer = QueryOptimizer(db)query = optimizer.optimize_large_table_query()return query.all()
11.3 性能監控
import time
from functools import wraps
from prometheus_client import Counter, Histogram# Prometheus 指標
DB_OPERATION_DURATION = Histogram('db_operation_duration_seconds','Database operation duration',['operation_type']
)
DB_OPERATION_ERRORS = Counter('db_operation_errors_total','Database operation errors',['operation_type']
)def monitor_db_operation(operation_type: str):def decorator(func):@wraps(func)async def wrapper(*args, **kwargs):start_time = time.time()try:result = await func(*args, **kwargs)duration = time.time() - start_timeDB_OPERATION_DURATION.labels(operation_type=operation_type).observe(duration)return resultexcept Exception as e:DB_OPERATION_ERRORS.labels(operation_type=operation_type).inc()raisereturn wrapperreturn decorator# 使用示例
@router.get("/users/monitored")
@monitor_db_operation("get_users")
async def get_users_monitored(skip: int = 0, limit: int = 100):return await async_get_users(skip, limit)
11.4 最佳實踐建議
-
高并發處理
- 使用適當的連接池大小
- 實現請求限流
- 使用異步操作
- 實現分布式鎖
-
大數據處理
- 實現批量處理
- 使用流式處理
- 實現數據分區
- 優化查詢性能
-
性能監控
- 監控數據庫操作
- 記錄性能指標
- 實現告警機制
-
擴展性考慮
- 實現水平擴展
- 使用讀寫分離
- 實現緩存策略
7. 總結
良好的數據庫配置對于 FastAPI 應用的性能和可靠性至關重要。通過合理配置連接池、正確處理事務、實施監控和日志記錄,可以構建一個健壯的數據庫訪問層。要根據實際應用場景和負載情況,不斷調整和優化配置參數。無論是使用依賴注入還是直接使用數據庫會話,都要確保正確管理數據庫資源,處理異常情況,并遵循最佳實踐。
12. 水平擴展、讀寫分離和緩存策略
12.1 水平擴展實現
from typing import List, Optional
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import randomclass DatabaseCluster:"""數據庫集群管理器"""def __init__(self):# 主數據庫配置self.master_db = {"url": "mysql+pymysql://user:pass@master:3306/db","engine": None,"session_maker": None}# 從數據庫配置列表self.slave_dbs = [{"url": "mysql+pymysql://user:pass@slave1:3306/db","weight": 1, # 權重,用于負載均衡"engine": None,"session_maker": None},{"url": "mysql+pymysql://user:pass@slave2:3306/db","weight": 1,"engine": None,"session_maker": None}]self._initialize_connections()def _initialize_connections(self):"""初始化所有數據庫連接"""# 初始化主庫連接self.master_db["engine"] = create_engine(self.master_db["url"],pool_size=5,max_overflow=10,pool_timeout=30)self.master_db["session_maker"] = sessionmaker(bind=self.master_db["engine"])# 初始化從庫連接for slave in self.slave_dbs:slave["engine"] = create_engine(slave["url"],pool_size=10,max_overflow=20,pool_timeout=30)slave["session_maker"] = sessionmaker(bind=slave["engine"])def get_master_session(self):"""獲取主庫會話"""return self.master_db["session_maker"]()def get_slave_session(self):"""獲取從庫會話(帶權重的隨機選擇)"""total_weight = sum(slave["weight"] for slave in self.slave_dbs)r = random.uniform(0, total_weight)current_weight = 0for slave in self.slave_dbs:current_weight += slave["weight"]if r <= current_weight:return slave["session_maker"]()# 默認返回第一個從庫return self.slave_dbs[0]["session_maker"]()# 使用示例
db_cluster = DatabaseCluster()class UserService:"""用戶服務,演示讀寫分離"""@staticmethoddef get_user_by_id(user_id: int) -> Optional[User]:"""讀操作:從從庫讀取"""session = db_cluster.get_slave_session()try:return session.query(User).filter(User.id == user_id).first()finally:session.close()@staticmethoddef create_user(user_data: dict) -> User:"""寫操作:在主庫寫入"""session = db_cluster.get_master_session()try:user = User(**user_data)session.add(user)session.commit()return userfinally:session.close()
12.2 讀寫分離策略
from contextlib import contextmanager
from typing import Generator
from sqlalchemy.orm import Sessionclass ReadWriteManager:"""讀寫分離管理器"""def __init__(self):self.db_cluster = DatabaseCluster()@contextmanagerdef read_session(self) -> Generator[Session, None, None]:"""獲取讀會話(從從庫)"""session = self.db_cluster.get_slave_session()try:yield sessionfinally:session.close()@contextmanagerdef write_session(self) -> Generator[Session, None, None]:"""獲取寫會話(從主庫)"""session = self.db_cluster.get_master_session()try:yield sessionsession.commit()except Exception:session.rollback()raisefinally:session.close()# 服務層實現
class EnhancedUserService:def __init__(self):self.db_manager = ReadWriteManager()def get_users(self, skip: int = 0, limit: int = 100) -> List[User]:"""讀操作示例"""with self.db_manager.read_session() as session:return session.query(User).offset(skip).limit(limit).all()def create_user(self, user_data: dict) -> User:"""寫操作示例"""with self.db_manager.write_session() as session:user = User(**user_data)session.add(user)return userdef update_user(self, user_id: int, user_data: dict) -> Optional[User]:"""復雜操作示例:先讀后寫"""# 首先從從庫讀取with self.db_manager.read_session() as read_session:user = read_session.query(User).filter(User.id == user_id).first()if not user:return None# 然后在主庫更新with self.db_manager.write_session() as write_session:user = write_session.query(User).filter(User.id == user_id).first()for key, value in user_data.items():setattr(user, key, value)return user
12.3 緩存策略實現
from functools import wraps
from typing import Any, Optional
import json
from redis import Redis
from datetime import timedeltaclass CacheManager:"""緩存管理器"""def __init__(self):self.redis_client = Redis(host='localhost',port=6379,db=0,decode_responses=True)self.default_ttl = timedelta(minutes=30)def get(self, key: str) -> Optional[str]:"""獲取緩存"""return self.redis_client.get(key)def set(self, key: str, value: Any, ttl: Optional[timedelta] = None):"""設置緩存"""if ttl is None:ttl = self.default_ttlif isinstance(value, (dict, list)):value = json.dumps(value)self.redis_client.set(key, value, ex=int(ttl.total_seconds()))def delete(self, key: str):"""刪除緩存"""self.redis_client.delete(key)def clear_pattern(self, pattern: str):"""清除匹配模式的所有緩存"""keys = self.redis_client.keys(pattern)if keys:self.redis_client.delete(*keys)# 緩存裝飾器
def cache_result(prefix: str,ttl: Optional[timedelta] = None,key_func = None
):"""緩存裝飾器:param prefix: 緩存鍵前綴:param ttl: 緩存過期時間:param key_func: 自定義緩存鍵生成函數"""cache_manager = CacheManager()def decorator(func):@wraps(func)async def wrapper(*args, **kwargs):# 生成緩存鍵if key_func:cache_key = f"{prefix}:{key_func(*args, **kwargs)}"else:# 默認使用參數作為緩存鍵key_parts = [str(arg) for arg in args]key_parts.extend(f"{k}:{v}" for k, v in sorted(kwargs.items()))cache_key = f"{prefix}:{':'.join(key_parts)}"# 嘗試從緩存獲取cached_result = cache_manager.get(cache_key)if cached_result:return json.loads(cached_result)# 執行原函數result = await func(*args, **kwargs)# 存入緩存cache_manager.set(cache_key, result, ttl)return resultreturn wrapperreturn decorator# 使用示例
class CachedUserService:def __init__(self):self.db_manager = ReadWriteManager()self.cache_manager = CacheManager()@cache_result(prefix="user", ttl=timedelta(minutes=10))async def get_user(self, user_id: int) -> Optional[dict]:"""帶緩存的用戶查詢"""with self.db_manager.read_session() as session:user = session.query(User).filter(User.id == user_id).first()if user:return user.to_dict()return Noneasync def update_user(self, user_id: int, user_data: dict) -> Optional[dict]:"""更新用戶并清除緩存"""with self.db_manager.write_session() as session:user = session.query(User).filter(User.id == user_id).first()if not user:return Nonefor key, value in user_data.items():setattr(user, key, value)# 清除相關緩存self.cache_manager.delete(f"user:{user_id}")# 清除可能包含該用戶的列表緩存self.cache_manager.clear_pattern("user_list:*")return user.to_dict()@cache_result(prefix="user_list", ttl=timedelta(minutes=5))async def get_user_list(self,skip: int = 0,limit: int = 100,status: Optional[str] = None) -> List[dict]:"""帶緩存的用戶列表查詢"""with self.db_manager.read_session() as session:query = session.query(User)if status:query = query.filter(User.status == status)users = query.offset(skip).limit(limit).all()return [user.to_dict() for user in users]# FastAPI 路由實現
@router.get("/users/{user_id}")
async def get_user(user_id: int):service = CachedUserService()user = await service.get_user(user_id)if not user:raise HTTPException(status_code=404, detail="User not found")return user@router.put("/users/{user_id}")
async def update_user(user_id: int, user_update: UserUpdate):service = CachedUserService()user = await service.update_user(user_id, user_update.dict())if not user:raise HTTPException(status_code=404, detail="User not found")return user@router.get("/users/")
async def get_users(skip: int = 0,limit: int = 100,status: Optional[str] = None
):service = CachedUserService()return await service.get_user_list(skip, limit, status)
12.4 最佳實踐建議
-
水平擴展注意事項
- 合理配置主從數據庫的連接池大小
- 實現故障轉移機制
- 監控數據庫負載和性能
- 定期檢查主從同步狀態
-
讀寫分離策略
- 明確區分讀寫操作
- 處理讀寫延遲問題
- 實現事務一致性
- 合理分配讀寫流量
-
緩存策略
- 選擇合適的緩存粒度
- 設置合理的過期時間
- 及時清除相關緩存
- 處理緩存擊穿和雪崩問題
-
性能優化
- 使用連接池
- 實現請求隊列
- 添加監控和告警
- 定期優化數據庫
-
可用性保障
- 實現健康檢查
- 添加重試機制
- 做好容錯處理
- 實現優雅降級
12.5 使用建議
-
水平擴展場景
- 讀多寫少的業務
- 高并發查詢場景
- 需要數據備份的場景
- 地理分布式部署
-
讀寫分離使用場景
- 讀寫比例失衡
- 需要提高讀取性能
- 數據一致性要求不高
- 復雜查詢較多
-
緩存策略使用場景
- 熱點數據訪問
- 計算密集型查詢
- 接口響應優化
- 減少數據庫壓力
-
注意事項
- 定期檢查系統性能
- 監控資源使用情況
- 做好數據備份
- 制定應急預案