FastAPI 數據庫配置最佳實踐

FastAPI 數據庫配置最佳實踐

1. 基礎配置

1.1 數據庫連接配置

from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker, declarative_base
from sqlalchemy.pool import QueuePool
from sqlalchemy.exc import SQLAlchemyError
import logging# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)# 數據庫URL配置
DATABASE_URL = "mysql+pymysql://user:password@localhost:3306/dbname"# 連接池配置
POOL_SIZE = 5  # 默認連接池大小
MAX_OVERFLOW = 10  # 最大溢出連接數
POOL_TIMEOUT = 30  # 連接池獲取超時時間
POOL_RECYCLE = 1800  # 連接回收時間(秒)# 創建數據庫引擎
engine = create_engine(DATABASE_URL,poolclass=QueuePool,pool_size=POOL_SIZE,max_overflow=MAX_OVERFLOW,pool_timeout=POOL_TIMEOUT,pool_recycle=POOL_RECYCLE,pool_pre_ping=True,  # 自動檢測失效連接echo=False  # 是否打印SQL語句
)

1.2 會話管理

# 創建會話工廠
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)# 聲明基類
Base = declarative_base()

2. 連接池配置說明

2.1 參數解釋

  • pool_size: 連接池保持的連接數

    • 建議值: (CPU核心數 × 2) + 有效磁盤數
    • 默認值: 5
    • 說明: 不是越大越好,需要根據實際負載調整
  • max_overflow: 超過 pool_size 后的最大允許連接數

    • 建議值: pool_size 的 2 倍以內
    • 作用: 處理突發流量
  • pool_timeout: 從連接池獲取連接的超時時間

    • 單位: 秒
    • 建議值: 30-60秒
    • 說明: 防止連接池耗盡時無限等待
  • pool_recycle: 連接重用的最大時間

    • 單位: 秒
    • 建議值: 1800秒(30分鐘)
    • 作用: 防止數據庫斷開空閑連接

2.2 連接池事件監聽

@event.listens_for(engine, "connect")
def connect(dbapi_connection, connection_record):logger.debug("New database connection established")@event.listens_for(engine, "checkout")
def checkout(dbapi_connection, connection_record, connection_proxy):logger.debug("Database connection checked out from pool")@event.listens_for(engine, "checkin")
def checkin(dbapi_connection, connection_record):logger.debug("Database connection returned to pool")

3. FastAPI 依賴注入

3.1 數據庫會話依賴

from typing import Generator
from fastapi import Dependsdef get_db() -> Generator:"""FastAPI 依賴函數,用于獲取數據庫會話使用方法:@app.get("/users/")def read_users(db: Session = Depends(get_db)):..."""db = SessionLocal()try:yield dbdb.commit()  # 如果沒有異常發生,提交事務except SQLAlchemyError as e:logger.error(f"Database error occurred: {str(e)}")db.rollback()  # 發生異常時回滾raise  # 重新拋出異常,讓 FastAPI 的異常處理器處理except Exception as e:logger.error(f"Unexpected error occurred: {str(e)}")db.rollback()raisefinally:db.close()  # 確保關閉會話

3.2 使用示例

from fastapi import FastAPI, Depends
from sqlalchemy.orm import Sessionapp = FastAPI()@app.get("/users/")
def read_users(db: Session = Depends(get_db)):users = db.query(User).all()return users@app.post("/users/")
def create_user(user: UserCreate, db: Session = Depends(get_db)):db_user = User(**user.dict())db.add(db_user)db.commit()db.refresh(db_user)return db_user

4. 健康檢查

4.1 數據庫連接檢查

def verify_db_connection() -> bool:"""驗證數據庫連接是否正常可以在應用啟動時或者健康檢查時使用"""try:db = SessionLocal()db.execute("SELECT 1")return Trueexcept Exception as e:logger.error(f"Database connection verification failed: {str(e)}")return Falsefinally:db.close()@app.on_event("startup")
async def startup():"""應用啟動時驗證數據庫連接"""if not verify_db_connection():raise Exception("Database connection failed")

5. 最佳實踐建議

  1. 連接池配置

    • 根據實際負載調整 pool_size
    • 設置合理的 pool_recycle 防止連接失效
    • 啟用 pool_pre_ping 自動檢測連接狀態
  2. 事務管理

    • 使用 context manager 管理事務
    • 確保正確的異常處理和回滾
    • 避免長事務
  3. 性能優化

    • 合理使用 autoflush
    • 適當設置 pool_timeout
    • 監控連接池使用情況
  4. 安全性

    • 使用環境變量管理數據庫憑證
    • 限制數據庫用戶權限
    • 定期更新數據庫密碼
  5. 監控和日志

    • 記錄關鍵數據庫操作
    • 監控連接池狀態
    • 設置合適的日志級別

6. 常見問題處理

  1. 連接池耗盡

    # 解決方案:調整連接池配置
    engine = create_engine(DATABASE_URL,pool_size=10,  # 增加連接池大小max_overflow=20,  # 增加最大溢出連接數pool_timeout=60  # 增加超時時間
    )
    
  2. 連接超時

    # 解決方案:添加重試機制
    from tenacity import retry, stop_after_attempt, wait_exponential@retry(stop=stop_after_attempt(3),wait=wait_exponential(multiplier=1, min=4, max=10)
    )
    def get_db_with_retry():return get_db()
    
  3. 死鎖處理

    # 解決方案:設置事務超時
    from sqlalchemy import event@event.listens_for(engine, "begin")
    def set_transaction_timeout(conn):conn.execute("SET innodb_lock_wait_timeout = 15")
    

7. 總結

良好的數據庫配置對于 FastAPI 應用的性能和可靠性至關重要。通過合理配置連接池、正確處理事務、實施監控和日志記錄,可以構建一個健壯的數據庫訪問層。要根據實際應用場景和負載情況,不斷調整和優化配置參數。

8. 不使用依賴注入的數據庫使用方式

8.1 使用上下文管理器

from contextlib import contextmanager@contextmanager
def get_db_context():"""使用上下文管理器獲取數據庫會話使用方式:with get_db_context() as db:result = db.query(User).all()"""db = SessionLocal()try:yield dbdb.commit()except Exception as e:logger.error(f"Database error: {str(e)}")db.rollback()raisefinally:db.close()# 使用示例
def get_user_by_id(user_id: int):with get_db_context() as db:return db.query(User).filter(User.id == user_id).first()def create_new_user(user_data: dict):with get_db_context() as db:user = User(**user_data)db.add(user)db.commit()db.refresh(user)return user

8.2 使用裝飾器

from functools import wrapsdef with_db(func):"""數據庫會話裝飾器使用方式:@with_dbdef my_function(db, *args, **kwargs):result = db.query(User).all()"""@wraps(func)def wrapper(*args, **kwargs):db = SessionLocal()try:kwargs['db'] = dbresult = func(*args, **kwargs)db.commit()return resultexcept Exception as e:logger.error(f"Database error: {str(e)}")db.rollback()raisefinally:db.close()return wrapper# 使用示例
@with_db
def get_user_list(db, skip: int = 0, limit: int = 100):return db.query(User).offset(skip).limit(limit).all()@with_db
def update_user(db, user_id: int, user_data: dict):user = db.query(User).filter(User.id == user_id).first()for key, value in user_data.items():setattr(user, key, value)db.commit()return user

8.3 直接使用會話

def get_db_session():"""獲取數據庫會話的簡單函數警告:使用此方法時需要手動管理會話的生命周期"""return SessionLocal()# 使用示例
def perform_complex_operation():db = get_db_session()try:# 執行多個數據庫操作result1 = db.query(User).all()result2 = db.query(Order).all()# 一些業務邏輯處理for user in result1:user.last_login = datetime.now()# 批量更新db.bulk_save_objects(result1)db.commit()return {"users": result1, "orders": result2}except Exception as e:logger.error(f"Error in complex operation: {str(e)}")db.rollback()raisefinally:db.close()

8.4 使用建議

  1. 選擇合適的方式

    • 簡單操作:使用上下文管理器(with get_db_context()
    • 重復性操作:使用裝飾器(@with_db
    • 復雜事務:直接使用會話
  2. 注意事項

    • 確保在所有情況下都正確關閉數據庫會話
    • 正確處理事務(提交/回滾)
    • 適當的錯誤處理和日志記錄
  3. 性能考慮

    • 避免頻繁創建和關閉會話
    • 合理使用批量操作
    • 注意事務范圍的控制
  4. 最佳實踐

    • 在同一個事務中盡量減少數據庫操作次數
    • 使用適當的索引優化查詢性能
    • 定期監控數據庫連接使用情況

7. 總結

良好的數據庫配置對于 FastAPI 應用的性能和可靠性至關重要。通過合理配置連接池、正確處理事務、實施監控和日志記錄,可以構建一個健壯的數據庫訪問層。要根據實際應用場景和負載情況,不斷調整和優化配置參數。無論是使用依賴注入還是直接使用數據庫會話,都要確保正確管理數據庫資源,處理異常情況,并遵循最佳實踐。

9. Galera Cluster 高級配置

9.1 連接配置

from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker
import random# Galera Cluster 節點配置
GALERA_NODES = ["mysql+pymysql://user:pass@node1:3306/dbname","mysql+pymysql://user:pass@node2:3306/dbname","mysql+pymysql://user:pass@node3:3306/dbname"
]def get_galera_engine(mode='random'):"""獲取 Galera Cluster 數據庫引擎mode: - random: 隨機選擇節點(適合讀操作)- primary: 使用主節點(適合寫操作)- all: 返回所有節點的引擎(特殊場景使用)"""if mode == 'random':# 隨機選擇一個節點用于讀操作db_url = random.choice(GALERA_NODES)elif mode == 'primary':# 使用主節點用于寫操作db_url = GALERA_NODES[0]else:# 返回所有節點的引擎return [create_engine(url, **DB_KWARGS) for url in GALERA_NODES]return create_engine(db_url, **DB_KWARGS)# 針對 Galera 的連接池配置
DB_KWARGS = {"pool_size": 5,"max_overflow": 10,"pool_timeout": 30,"pool_recycle": 1800,"pool_pre_ping": True,# Galera 特定配置"connect_args": {"connect_timeout": 10,"read_timeout": 30,"write_timeout": 30,"charset": "utf8mb4",# 設置事務隔離級別"init_command": "SET SESSION TRANSACTION ISOLATION LEVEL READ COMMITTED"}
}

9.2 讀寫分離實現

class GaleraSessionManager:"""Galera Cluster 會話管理器,實現讀寫分離"""def __init__(self):self.write_engine = get_galera_engine(mode='primary')self.read_engines = [get_galera_engine(mode='random') for _ in range(2)]self.WriteSessionLocal = sessionmaker(bind=self.write_engine)self.ReadSessionLocal = sessionmaker(bind=random.choice(self.read_engines))@contextmanagerdef get_read_db(self):"""獲取讀會話"""db = self.ReadSessionLocal()try:yield dbfinally:db.close()@contextmanagerdef get_write_db(self):"""獲取寫會話"""db = self.WriteSessionLocal()try:yield dbdb.commit()except Exception:db.rollback()raisefinally:db.close()# 使用示例
galera_manager = GaleraSessionManager()def read_user_profile(user_id: int):with galera_manager.get_read_db() as db:return db.query(User).filter(User.id == user_id).first()def update_user_profile(user_id: int, data: dict):with galera_manager.get_write_db() as db:user = db.query(User).filter(User.id == user_id).first()for key, value in data.items():setattr(user, key, value)

9.3 事務和一致性處理

from contextlib import contextmanager
from sqlalchemy.orm import Session
import timeclass GaleraTransaction:"""Galera 集群事務管理器"""MAX_RETRIES = 3RETRY_DELAY = 1  # 秒@staticmethod@contextmanagerdef atomic(db: Session):"""原子事務處理器,處理 Galera 特有的死鎖和沖突"""retry_count = 0while True:try:yieldbreakexcept Exception as e:if retry_count >= GaleraTransaction.MAX_RETRIES:raiseif "deadlock" in str(e).lower() or "lock wait timeout" in str(e).lower():retry_count += 1time.sleep(GaleraTransaction.RETRY_DELAY)continueraise# 使用示例
def process_order(order_id: int):with galera_manager.get_write_db() as db:with GaleraTransaction.atomic(db):# 更新訂單狀態order = db.query(Order).filter(Order.id == order_id).first()order.status = 'processing'# 更新庫存inventory = db.query(Inventory).filter(Inventory.product_id == order.product_id).first()inventory.quantity -= order.quantity

9.4 高可用性配置

class GaleraHighAvailability:"""Galera 集群高可用性管理"""def __init__(self):self.nodes = GALERA_NODESself.active_nodes = set()self.check_interval = 30  # 秒async def monitor_nodes(self):"""監控節點狀態"""while True:for node in self.nodes:try:engine = create_engine(node, **DB_KWARGS)with engine.connect() as conn:# 檢查節點狀態result = conn.execute("SHOW STATUS LIKE 'wsrep_local_state_comment'")status = result.fetchone()[1]if status == 'Synced':self.active_nodes.add(node)else:self.active_nodes.discard(node)except Exception:self.active_nodes.discard(node)await asyncio.sleep(self.check_interval)def get_available_node(self):"""獲取可用節點"""if not self.active_nodes:raise Exception("No available Galera nodes")return random.choice(list(self.active_nodes))# 使用示例
ha_manager = GaleraHighAvailability()@app.on_event("startup")
async def startup_event():# 啟動節點監控asyncio.create_task(ha_manager.monitor_nodes())def get_db_connection():"""獲取高可用數據庫連接"""node = ha_manager.get_available_node()engine = create_engine(node, **DB_KWARGS)return sessionmaker(bind=engine)()

9.5 性能優化建議

  1. 讀寫分離策略

    • 讀操作使用隨機節點
    • 寫操作固定使用主節點
    • 批量操作考慮負載均衡
  2. 事務處理

    • 使用 READ COMMITTED 隔離級別
    • 實現死鎖重試機制
    • 控制事務大小和持續時間
  3. 連接池優化

    • 為不同節點維護獨立的連接池
    • 根據節點角色調整連接池大小
    • 定期檢查并清理空閑連接
  4. 監控和維護

    • 實時監控節點狀態
    • 記錄同步延遲
    • 實現故障自動轉移
  5. 查詢優化

    • 使用合適的索引
    • 避免長事務
    • 控制并發寫入操作

7. 總結

良好的數據庫配置對于 FastAPI 應用的性能和可靠性至關重要。通過合理配置連接池、正確處理事務、實施監控和日志記錄,可以構建一個健壯的數據庫訪問層。要根據實際應用場景和負載情況,不斷調整和優化配置參數。無論是使用依賴注入還是直接使用數據庫會話,都要確保正確管理數據庫資源,處理異常情況,并遵循最佳實踐。

10. 完整的 CRUD 示例

10.1 模型定義

from sqlalchemy import Column, Integer, String, DateTime, Boolean
from sqlalchemy.sql import func
from pydantic import BaseModel, EmailStr
from typing import Optional# SQLAlchemy 模型
class User(Base):__tablename__ = "users"id = Column(Integer, primary_key=True, index=True)email = Column(String(255), unique=True, index=True, nullable=False)username = Column(String(50), unique=True, index=True)hashed_password = Column(String(255), nullable=False)is_active = Column(Boolean, default=True)created_at = Column(DateTime(timezone=True), server_default=func.now())updated_at = Column(DateTime(timezone=True), onupdate=func.now())# Pydantic 模型
class UserBase(BaseModel):email: EmailStrusername: strclass UserCreate(UserBase):password: strclass UserUpdate(BaseModel):email: Optional[EmailStr] = Noneusername: Optional[str] = Noneis_active: Optional[bool] = Noneclass UserResponse(UserBase):id: intis_active: boolcreated_at: datetimeupdated_at: Optional[datetime]class Config:orm_mode = True

10.2 服務層實現

from passlib.context import CryptContext
from typing import List, Optionalpwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")class UserService:def __init__(self, galera_manager: GaleraSessionManager):self.db_manager = galera_managerdef get_user(self, user_id: int) -> Optional[User]:"""讀取單個用戶"""with self.db_manager.get_read_db() as db:return db.query(User).filter(User.id == user_id).first()def get_user_by_email(self, email: str) -> Optional[User]:"""通過郵箱讀取用戶"""with self.db_manager.get_read_db() as db:return db.query(User).filter(User.email == email).first()def get_users(self, skip: int = 0, limit: int = 100) -> List[User]:"""讀取用戶列表"""with self.db_manager.get_read_db() as db:return db.query(User).offset(skip).limit(limit).all()def create_user(self, user: UserCreate) -> User:"""創建新用戶"""with self.db_manager.get_write_db() as db:with GaleraTransaction.atomic(db):# 檢查郵箱是否已存在if self.get_user_by_email(user.email):raise ValueError("Email already registered")# 創建用戶hashed_password = pwd_context.hash(user.password)db_user = User(email=user.email,username=user.username,hashed_password=hashed_password)db.add(db_user)db.flush()  # 獲取自增IDdb.refresh(db_user)return db_userdef update_user(self, user_id: int, user_update: UserUpdate) -> Optional[User]:"""更新用戶信息"""with self.db_manager.get_write_db() as db:with GaleraTransaction.atomic(db):db_user = db.query(User).filter(User.id == user_id).first()if not db_user:return None# 更新非空字段update_data = user_update.dict(exclude_unset=True)for key, value in update_data.items():setattr(db_user, key, value)db.flush()db.refresh(db_user)return db_userdef delete_user(self, user_id: int) -> bool:"""刪除用戶"""with self.db_manager.get_write_db() as db:with GaleraTransaction.atomic(db):db_user = db.query(User).filter(User.id == user_id).first()if not db_user:return Falsedb.delete(db_user)return True

10.3 API 路由實現

from fastapi import APIRouter, HTTPException, status, Depends
from typing import Listrouter = APIRouter(prefix="/users", tags=["users"])# 初始化服務
galera_manager = GaleraSessionManager()
user_service = UserService(galera_manager)@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(user: UserCreate):try:return user_service.create_user(user)except ValueError as e:raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST,detail=str(e))except Exception as e:raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,detail="Failed to create user")@router.get("/{user_id}", response_model=UserResponse)
async def read_user(user_id: int):user = user_service.get_user(user_id)if user is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,detail="User not found")return user@router.get("/", response_model=List[UserResponse])
async def read_users(skip: int = 0, limit: int = 100):return user_service.get_users(skip=skip, limit=limit)@router.put("/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user_update: UserUpdate):user = user_service.update_user(user_id, user_update)if user is None:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,detail="User not found")return user@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: int):success = user_service.delete_user(user_id)if not success:raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,detail="User not found")

10.4 使用示例

# 1. 創建用戶
curl -X POST "http://localhost:8000/users/" \-H "Content-Type: application/json" \-d '{"email": "user@example.com", "username": "testuser", "password": "secret123"}'# 2. 獲取用戶列表
curl "http://localhost:8000/users/?skip=0&limit=10"# 3. 獲取單個用戶
curl "http://localhost:8000/users/1"# 4. 更新用戶
curl -X PUT "http://localhost:8000/users/1" \-H "Content-Type: application/json" \-d '{"username": "newusername"}'# 5. 刪除用戶
curl -X DELETE "http://localhost:8000/users/1"

10.5 關鍵特性說明

  1. 讀寫分離

    • 讀操作(get_user, get_users)使用讀節點
    • 寫操作(create_user, update_user, delete_user)使用寫節點
  2. 事務處理

    • 所有寫操作都在 GaleraTransaction.atomic 中執行
    • 自動處理死鎖和重試
  3. 異常處理

    • 業務邏輯異常(如郵箱已存在)
    • 數據庫異常
    • HTTP 響應異常
  4. 性能優化

    • 使用索引(email, username)
    • 分頁查詢
    • 選擇性更新字段
  5. 數據驗證

    • 使用 Pydantic 模型驗證輸入數據
    • 明確的響應模型定義

7. 總結

良好的數據庫配置對于 FastAPI 應用的性能和可靠性至關重要。通過合理配置連接池、正確處理事務、實施監控和日志記錄,可以構建一個健壯的數據庫訪問層。要根據實際應用場景和負載情況,不斷調整和優化配置參數。無論是使用依賴注入還是直接使用數據庫會話,都要確保正確管理數據庫資源,處理異常情況,并遵循最佳實踐。

11. 高并發和巨量數據處理

11.1 高并發優化

11.1.1 連接池配置
from sqlalchemy import create_engine
import multiprocessing# 計算最優連接池大小
CPU_COUNT = multiprocessing.cpu_count()
POOL_SIZE = CPU_COUNT * 2 + 1  # 基礎連接數
MAX_OVERFLOW = POOL_SIZE * 2   # 最大溢出連接數# 高并發場景的連接池配置
engine = create_engine(DATABASE_URL,poolclass=QueuePool,pool_size=POOL_SIZE,max_overflow=MAX_OVERFLOW,pool_timeout=30,pool_recycle=1800,pool_pre_ping=True,# 設置 echo_pool 來監控連接池狀態echo_pool=True
)# 連接池監控
@event.listens_for(engine, "checkout")
def receive_checkout(dbapi_connection, connection_record, connection_proxy):logger.info(f"Connection pool size: {engine.pool.size()}")logger.info(f"Current checked out connections: {engine.pool.checkedin()}")
11.1.2 異步查詢處理
from fastapi import FastAPI, BackgroundTasks
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.future import select# 異步數據庫引擎
async_engine = create_async_engine("mysql+asyncmy://user:pass@localhost/db",pool_size=20,max_overflow=30
)async def get_async_session():async with AsyncSession(async_engine) as session:yield session# 異步查詢示例
async def async_get_users(skip: int = 0, limit: int = 100):async with AsyncSession(async_engine) as session:query = select(User).offset(skip).limit(limit)result = await session.execute(query)return result.scalars().all()# API 實現
@router.get("/users/async")
async def get_users_async(background_tasks: BackgroundTasks,skip: int = 0, limit: int = 100
):# 將耗時操作放入后臺任務background_tasks.add_task(process_user_data)return await async_get_users(skip, limit)
11.1.3 并發控制
from fastapi import HTTPException
from asyncio import Semaphore
import asyncio# 限制并發請求數
MAX_CONCURRENT_REQUESTS = 100
semaphore = Semaphore(MAX_CONCURRENT_REQUESTS)async def controlled_operation():async with semaphore:# 執行數據庫操作pass# 使用 Redis 實現分布式鎖
from redis import Redis
from redis.lock import Lockredis_client = Redis(host='localhost', port=6379, db=0)def with_distributed_lock(lock_name: str, timeout: int = 10):def decorator(func):async def wrapper(*args, **kwargs):lock = Lock(redis_client, lock_name, timeout=timeout)if not lock.acquire(blocking=True, blocking_timeout=5):raise HTTPException(status_code=423,detail="Resource is locked")try:return await func(*args, **kwargs)finally:lock.release()return wrapperreturn decorator# 使用示例
@router.post("/users/batch")
@with_distributed_lock("create_users_lock")
async def create_users_batch(users: List[UserCreate]):return await batch_create_users(users)

11.2 巨量數據處理

11.2.1 分批處理
from typing import List, Generator
import pandas as pdclass BatchProcessor:def __init__(self, batch_size: int = 1000):self.batch_size = batch_sizeself.galera_manager = GaleraSessionManager()def process_large_dataset(self, data: List[dict]):"""分批處理大數據集"""for batch in self._get_batches(data):with self.galera_manager.get_write_db() as db:with GaleraTransaction.atomic(db):self._process_batch(db, batch)def _get_batches(self, data: List[dict]) -> Generator[List[dict], None, None]:"""將數據集分割成小批次"""for i in range(0, len(data), self.batch_size):yield data[i:i + self.batch_size]def _process_batch(self, db: Session, batch: List[dict]):"""處理單個批次的數據"""try:# 使用 bulk_insert_mappings 進行批量插入db.bulk_insert_mappings(User, batch)db.flush()except Exception as e:logger.error(f"Error processing batch: {str(e)}")raise# 使用示例
@router.post("/users/bulk")
async def bulk_create_users(background_tasks: BackgroundTasks,users: List[UserCreate]
):processor = BatchProcessor(batch_size=1000)# 將大量數據處理放入后臺任務background_tasks.add_task(processor.process_large_dataset, users)return {"message": "Bulk processing started"}
11.2.2 流式處理
from fastapi.responses import StreamingResponse
import csv
import ioclass StreamProcessor:CHUNK_SIZE = 1000@staticmethodasync def stream_large_query(query):"""流式處理大型查詢結果"""with SessionLocal() as db:result_proxy = db.execute(query)while True:chunk = result_proxy.fetchmany(StreamProcessor.CHUNK_SIZE)if not chunk:breakyield chunk@staticmethodasync def export_large_dataset():"""導出大型數據集"""output = io.StringIO()writer = csv.writer(output)# 寫入表頭writer.writerow(["id", "email", "username", "created_at"])query = "SELECT id, email, username, created_at FROM users"async for chunk in StreamProcessor.stream_large_query(query):for row in chunk:writer.writerow(row)output.seek(0)data = output.read()output.seek(0)output.truncate(0)yield data# API 實現
@router.get("/users/export")
async def export_users():return StreamingResponse(StreamProcessor.export_large_dataset(),media_type="text/csv",headers={"Content-Disposition": "attachment; filename=users.csv"})
11.2.3 分區表處理
from sqlalchemy import Table, Column, Integer, String, DateTime
from sqlalchemy.schema import CreateTable
import datetimedef create_partition_table(year: int, month: int):"""創建分區表"""partition_name = f"users_{year}_{month:02d}"# 創建分區表partition = Table(partition_name,Base.metadata,Column('id', Integer, primary_key=True),Column('email', String(255)),Column('username', String(50)),Column('created_at', DateTime),postgresql_partition_by='RANGE (created_at)',)# 設置分區范圍start_date = datetime.datetime(year, month, 1)if month == 12:end_date = datetime.datetime(year + 1, 1, 1)else:end_date = datetime.datetime(year, month + 1, 1)partition_sql = f"""CREATE TABLE {partition_name}PARTITION OF usersFOR VALUES FROM ('{start_date}') TO ('{end_date}')"""return partition_sqlclass PartitionManager:def __init__(self):self.engine = create_engine(DATABASE_URL)def ensure_partition_exists(self, date: datetime.datetime):"""確保指定日期的分區存在"""year = date.yearmonth = date.monthpartition_name = f"users_{year}_{month:02d}"# 檢查分區是否存在exists_query = f"""SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = '{partition_name}')"""with self.engine.connect() as conn:exists = conn.execute(exists_query).scalar()if not exists:# 創建新分區partition_sql = create_partition_table(year, month)conn.execute(partition_sql)# 使用示例
@router.post("/users/partitioned")
async def create_user_partitioned(user: UserCreate):partition_manager = PartitionManager()# 確保當前月份的分區存在partition_manager.ensure_partition_exists(datetime.datetime.now())# 創建用戶with galera_manager.get_write_db() as db:with GaleraTransaction.atomic(db):db_user = User(**user.dict())db.add(db_user)return db_user
11.2.4 查詢優化
from sqlalchemy import text
from sqlalchemy.orm import joinedload, selectinloadclass QueryOptimizer:def __init__(self, db: Session):self.db = dbdef optimize_large_table_query(self):"""優化大表查詢"""# 1. 使用索引提示query = text("""SELECT /*+ INDEX(users email_idx) */FROM usersWHERE email LIKE :pattern""")# 2. 使用延遲加載query = self.db.query(User).options(selectinload(User.orders))# 3. 使用覆蓋索引query = self.db.query(User.id, User.email, User.username).filter(User.is_active == True)# 4. 分頁優化query = self.db.query(User).filter(User.id > last_id).order_by(User.id).limit(100)return querydef analyze_query(self, query):"""分析查詢性能"""explain_sql = f"EXPLAIN ANALYZE {query}"return self.db.execute(explain_sql)# 使用示例
@router.get("/users/optimized")
async def get_users_optimized(db: Session = Depends(get_db),last_id: int = 0,limit: int = 100
):optimizer = QueryOptimizer(db)query = optimizer.optimize_large_table_query()return query.all()

11.3 性能監控

import time
from functools import wraps
from prometheus_client import Counter, Histogram# Prometheus 指標
DB_OPERATION_DURATION = Histogram('db_operation_duration_seconds','Database operation duration',['operation_type']
)
DB_OPERATION_ERRORS = Counter('db_operation_errors_total','Database operation errors',['operation_type']
)def monitor_db_operation(operation_type: str):def decorator(func):@wraps(func)async def wrapper(*args, **kwargs):start_time = time.time()try:result = await func(*args, **kwargs)duration = time.time() - start_timeDB_OPERATION_DURATION.labels(operation_type=operation_type).observe(duration)return resultexcept Exception as e:DB_OPERATION_ERRORS.labels(operation_type=operation_type).inc()raisereturn wrapperreturn decorator# 使用示例
@router.get("/users/monitored")
@monitor_db_operation("get_users")
async def get_users_monitored(skip: int = 0, limit: int = 100):return await async_get_users(skip, limit)

11.4 最佳實踐建議

  1. 高并發處理

    • 使用適當的連接池大小
    • 實現請求限流
    • 使用異步操作
    • 實現分布式鎖
  2. 大數據處理

    • 實現批量處理
    • 使用流式處理
    • 實現數據分區
    • 優化查詢性能
  3. 性能監控

    • 監控數據庫操作
    • 記錄性能指標
    • 實現告警機制
  4. 擴展性考慮

    • 實現水平擴展
    • 使用讀寫分離
    • 實現緩存策略

7. 總結

良好的數據庫配置對于 FastAPI 應用的性能和可靠性至關重要。通過合理配置連接池、正確處理事務、實施監控和日志記錄,可以構建一個健壯的數據庫訪問層。要根據實際應用場景和負載情況,不斷調整和優化配置參數。無論是使用依賴注入還是直接使用數據庫會話,都要確保正確管理數據庫資源,處理異常情況,并遵循最佳實踐。

12. 水平擴展、讀寫分離和緩存策略

12.1 水平擴展實現

from typing import List, Optional
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import randomclass DatabaseCluster:"""數據庫集群管理器"""def __init__(self):# 主數據庫配置self.master_db = {"url": "mysql+pymysql://user:pass@master:3306/db","engine": None,"session_maker": None}# 從數據庫配置列表self.slave_dbs = [{"url": "mysql+pymysql://user:pass@slave1:3306/db","weight": 1,  # 權重,用于負載均衡"engine": None,"session_maker": None},{"url": "mysql+pymysql://user:pass@slave2:3306/db","weight": 1,"engine": None,"session_maker": None}]self._initialize_connections()def _initialize_connections(self):"""初始化所有數據庫連接"""# 初始化主庫連接self.master_db["engine"] = create_engine(self.master_db["url"],pool_size=5,max_overflow=10,pool_timeout=30)self.master_db["session_maker"] = sessionmaker(bind=self.master_db["engine"])# 初始化從庫連接for slave in self.slave_dbs:slave["engine"] = create_engine(slave["url"],pool_size=10,max_overflow=20,pool_timeout=30)slave["session_maker"] = sessionmaker(bind=slave["engine"])def get_master_session(self):"""獲取主庫會話"""return self.master_db["session_maker"]()def get_slave_session(self):"""獲取從庫會話(帶權重的隨機選擇)"""total_weight = sum(slave["weight"] for slave in self.slave_dbs)r = random.uniform(0, total_weight)current_weight = 0for slave in self.slave_dbs:current_weight += slave["weight"]if r <= current_weight:return slave["session_maker"]()# 默認返回第一個從庫return self.slave_dbs[0]["session_maker"]()# 使用示例
db_cluster = DatabaseCluster()class UserService:"""用戶服務,演示讀寫分離"""@staticmethoddef get_user_by_id(user_id: int) -> Optional[User]:"""讀操作:從從庫讀取"""session = db_cluster.get_slave_session()try:return session.query(User).filter(User.id == user_id).first()finally:session.close()@staticmethoddef create_user(user_data: dict) -> User:"""寫操作:在主庫寫入"""session = db_cluster.get_master_session()try:user = User(**user_data)session.add(user)session.commit()return userfinally:session.close()

12.2 讀寫分離策略

from contextlib import contextmanager
from typing import Generator
from sqlalchemy.orm import Sessionclass ReadWriteManager:"""讀寫分離管理器"""def __init__(self):self.db_cluster = DatabaseCluster()@contextmanagerdef read_session(self) -> Generator[Session, None, None]:"""獲取讀會話(從從庫)"""session = self.db_cluster.get_slave_session()try:yield sessionfinally:session.close()@contextmanagerdef write_session(self) -> Generator[Session, None, None]:"""獲取寫會話(從主庫)"""session = self.db_cluster.get_master_session()try:yield sessionsession.commit()except Exception:session.rollback()raisefinally:session.close()# 服務層實現
class EnhancedUserService:def __init__(self):self.db_manager = ReadWriteManager()def get_users(self, skip: int = 0, limit: int = 100) -> List[User]:"""讀操作示例"""with self.db_manager.read_session() as session:return session.query(User).offset(skip).limit(limit).all()def create_user(self, user_data: dict) -> User:"""寫操作示例"""with self.db_manager.write_session() as session:user = User(**user_data)session.add(user)return userdef update_user(self, user_id: int, user_data: dict) -> Optional[User]:"""復雜操作示例:先讀后寫"""# 首先從從庫讀取with self.db_manager.read_session() as read_session:user = read_session.query(User).filter(User.id == user_id).first()if not user:return None# 然后在主庫更新with self.db_manager.write_session() as write_session:user = write_session.query(User).filter(User.id == user_id).first()for key, value in user_data.items():setattr(user, key, value)return user

12.3 緩存策略實現

from functools import wraps
from typing import Any, Optional
import json
from redis import Redis
from datetime import timedeltaclass CacheManager:"""緩存管理器"""def __init__(self):self.redis_client = Redis(host='localhost',port=6379,db=0,decode_responses=True)self.default_ttl = timedelta(minutes=30)def get(self, key: str) -> Optional[str]:"""獲取緩存"""return self.redis_client.get(key)def set(self, key: str, value: Any, ttl: Optional[timedelta] = None):"""設置緩存"""if ttl is None:ttl = self.default_ttlif isinstance(value, (dict, list)):value = json.dumps(value)self.redis_client.set(key, value, ex=int(ttl.total_seconds()))def delete(self, key: str):"""刪除緩存"""self.redis_client.delete(key)def clear_pattern(self, pattern: str):"""清除匹配模式的所有緩存"""keys = self.redis_client.keys(pattern)if keys:self.redis_client.delete(*keys)# 緩存裝飾器
def cache_result(prefix: str,ttl: Optional[timedelta] = None,key_func = None
):"""緩存裝飾器:param prefix: 緩存鍵前綴:param ttl: 緩存過期時間:param key_func: 自定義緩存鍵生成函數"""cache_manager = CacheManager()def decorator(func):@wraps(func)async def wrapper(*args, **kwargs):# 生成緩存鍵if key_func:cache_key = f"{prefix}:{key_func(*args, **kwargs)}"else:# 默認使用參數作為緩存鍵key_parts = [str(arg) for arg in args]key_parts.extend(f"{k}:{v}" for k, v in sorted(kwargs.items()))cache_key = f"{prefix}:{':'.join(key_parts)}"# 嘗試從緩存獲取cached_result = cache_manager.get(cache_key)if cached_result:return json.loads(cached_result)# 執行原函數result = await func(*args, **kwargs)# 存入緩存cache_manager.set(cache_key, result, ttl)return resultreturn wrapperreturn decorator# 使用示例
class CachedUserService:def __init__(self):self.db_manager = ReadWriteManager()self.cache_manager = CacheManager()@cache_result(prefix="user", ttl=timedelta(minutes=10))async def get_user(self, user_id: int) -> Optional[dict]:"""帶緩存的用戶查詢"""with self.db_manager.read_session() as session:user = session.query(User).filter(User.id == user_id).first()if user:return user.to_dict()return Noneasync def update_user(self, user_id: int, user_data: dict) -> Optional[dict]:"""更新用戶并清除緩存"""with self.db_manager.write_session() as session:user = session.query(User).filter(User.id == user_id).first()if not user:return Nonefor key, value in user_data.items():setattr(user, key, value)# 清除相關緩存self.cache_manager.delete(f"user:{user_id}")# 清除可能包含該用戶的列表緩存self.cache_manager.clear_pattern("user_list:*")return user.to_dict()@cache_result(prefix="user_list", ttl=timedelta(minutes=5))async def get_user_list(self,skip: int = 0,limit: int = 100,status: Optional[str] = None) -> List[dict]:"""帶緩存的用戶列表查詢"""with self.db_manager.read_session() as session:query = session.query(User)if status:query = query.filter(User.status == status)users = query.offset(skip).limit(limit).all()return [user.to_dict() for user in users]# FastAPI 路由實現
@router.get("/users/{user_id}")
async def get_user(user_id: int):service = CachedUserService()user = await service.get_user(user_id)if not user:raise HTTPException(status_code=404, detail="User not found")return user@router.put("/users/{user_id}")
async def update_user(user_id: int, user_update: UserUpdate):service = CachedUserService()user = await service.update_user(user_id, user_update.dict())if not user:raise HTTPException(status_code=404, detail="User not found")return user@router.get("/users/")
async def get_users(skip: int = 0,limit: int = 100,status: Optional[str] = None
):service = CachedUserService()return await service.get_user_list(skip, limit, status)

12.4 最佳實踐建議

  1. 水平擴展注意事項

    • 合理配置主從數據庫的連接池大小
    • 實現故障轉移機制
    • 監控數據庫負載和性能
    • 定期檢查主從同步狀態
  2. 讀寫分離策略

    • 明確區分讀寫操作
    • 處理讀寫延遲問題
    • 實現事務一致性
    • 合理分配讀寫流量
  3. 緩存策略

    • 選擇合適的緩存粒度
    • 設置合理的過期時間
    • 及時清除相關緩存
    • 處理緩存擊穿和雪崩問題
  4. 性能優化

    • 使用連接池
    • 實現請求隊列
    • 添加監控和告警
    • 定期優化數據庫
  5. 可用性保障

    • 實現健康檢查
    • 添加重試機制
    • 做好容錯處理
    • 實現優雅降級

12.5 使用建議

  1. 水平擴展場景

    • 讀多寫少的業務
    • 高并發查詢場景
    • 需要數據備份的場景
    • 地理分布式部署
  2. 讀寫分離使用場景

    • 讀寫比例失衡
    • 需要提高讀取性能
    • 數據一致性要求不高
    • 復雜查詢較多
  3. 緩存策略使用場景

    • 熱點數據訪問
    • 計算密集型查詢
    • 接口響應優化
    • 減少數據庫壓力
  4. 注意事項

    • 定期檢查系統性能
    • 監控資源使用情況
    • 做好數據備份
    • 制定應急預案

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/66749.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/66749.shtml
英文地址,請注明出處:http://en.pswp.cn/web/66749.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

深度解析 Java 的幻讀現象與應對策略

目錄 一、幻讀現象的本質 二、幻讀在 Java 數據庫編程中的體現 三、幻讀帶來的問題 四、應對幻讀的策略 1. 數據庫隔離級別 2. 應用層解決方案 五、總結 在 Java 的數據庫編程領域&#xff0c;幻讀是一個不容忽視的概念。它涉及到數據庫事務處理過程中數據一致性的關鍵問…

Glary Utilities Pro 多語便攜版系統優化工具 v6.21.0.25

Glary Utilities是一款功能強大的系統優化工具軟件&#xff0c;旨在幫助用戶清理計算機垃圾文件、修復系統錯誤、優化系統性能等。 軟件功能 清理和修復&#xff1a;可以清理系統垃圾文件、無效注冊表項、無效快捷方式等&#xff0c;修復系統錯誤和藍屏問題。 優化和加速&…

【貪心算法】洛谷P1106 - 刪數問題

2025 - 12 - 26 - 第 46 篇 【洛谷】貪心算法題單 - 【貪心算法】 - 【學習筆記】 作者(Author): 鄭龍浩 / 仟濹(CSND賬號名) 目錄 文章目錄 目錄P1106 刪數問題題目描述輸入格式輸出格式樣例 #1樣例輸入 #1樣例輸出 #1 提示思路代碼 P1106 刪數問題 題目描述 鍵盤輸入一個高…

Oracle 創建并使用外部表

目錄 一. 什么是外部表二. 創建外部表所在的文件夾對象三. 授予訪問外部表文件夾的權限3.1 DBA用戶授予普通用戶訪問外部表文件夾的權限3.2 授予Win10上的Oracle用戶訪問桌面文件夾的權限 四. 普通用戶創建外部表五. 查詢六. 刪除 一. 什么是外部表 在 Oracle 數據庫中&#x…

基于FPGA的BPSK+costas環實現,包含testbench,分析不同信噪比對costas環性能影響

目錄 1.算法仿真效果 2.算法涉及理論知識概要 3.Verilog核心程序 4.完整算法代碼文件獲得 1.算法仿真效果 本作品是之前作品的改進和擴展&#xff1a; 1.m基于FPGA的BPSK調制解調通信系統verilog實現,包含testbench,包含載波同步_csdn基于fpga的bpsk-CSDN博客 2.m基于FP…

Linux 目錄操作詳解

Linux目錄操作詳解 1. 獲取當前工作目錄1.1 getcwd()1.2 get_current_dir_name() 2. 切換工作目錄2.1 chdir() 3. 創建和刪除目錄3.1 mkdir()3.2 rmdir() 4. 獲取目錄中的文件列表4.1 opendir() 打開目錄4.2 readdir() 讀取目錄內容4.3 closedir() 關閉目錄 5. dirent 結構體6.…

Spring 依賴注入詳解:創建 Bean 和注入依賴是一回事嗎?

1. 什么是依賴注入&#xff08;Dependency Injection&#xff0c;DI&#xff09;&#xff1f; 依賴注入 是 Spring IoC&#xff08;控制反轉&#xff09;容器的核心功能。它的目標是將對象的依賴&#xff08;如其他對象或配置&#xff09;從對象本身中剝離&#xff0c;由容器負…

AI時代的網絡安全:傳統技術的落寞與新機遇

AI時代的網絡安全&#xff1a;傳統技術的落寞與新機遇 在AI技術飛速發展的浪潮中&#xff0c;網絡安全領域正經歷著前所未有的變革。一方面&#xff0c;傳統網絡安全技術在面對新型攻擊手段時逐漸顯露出局限性&#xff1b;另一方面&#xff0c;AI為網絡安全帶來了新的機遇&…

后端開發Web

Maven Maven是apache旗下的一個開源項目&#xff0c;是一款用于管理和構建java項目的工具 Maven的作用 依賴管理 方便快捷的管理項目依賴的資源&#xff08;jar包&#xff09;&#xff0c;避免版本沖突問題 統一項目結構 提供標準、統一的項目結構 項目構建 標準跨平臺(…

前沿技術趨勢洞察:2024年技術的嶄新篇章與未來走向!

引言 時光飛逝&#xff0c;2024年已經來臨&#xff0c;回顧過去一年&#xff0c;科技的迅猛進步簡直讓人目不暇接。 在人工智能&#xff08;AI&#xff09;越來越強大的今天&#xff0c;我們不再停留在幻想階段&#xff0c;量子計算的雛形開始展示它的無窮潛力&#xff0c;Web …

【10.2】隊列-設計循環隊列

一、題目 設計你的循環隊列實現。 循環隊列是一種線性數據結構&#xff0c;其操作表現基于 FIFO&#xff08;先進先出&#xff09;原則并且隊尾被連接在隊首之后以形成一個循環。它也被稱為“環形緩沖器”。 循環隊列的一個好處是我們可以利用這個隊列之前用過的空間。在一個普…

博客之星2024年度總評選——我的年度創作回顧與總結

2024年&#xff0c;是我在CSDN博客上持續耕耘、不斷成長的一年。在此&#xff0c;與大家分享一下我的年度創作回顧與總結。 一、創作成長與突破 在人工智能領域&#xff0c;技術迭代迅速&#xff0c;知識更新頻繁。為了保持自己的競爭力&#xff0c;在今年&#xff0c;我始終…

IDEA運行Java項目總會報程序包xxx不存在

我的在另外一臺電腦上跑是沒有問題的&#xff0c;在新的電腦上跑的時候&#xff0c;又出現了這個惡心的問題...... 思來想去&#xff0c;唯一的問題就是我的mavn環境沒的配置好 如何在本地部署mavn環境&#xff0c;這里推薦一篇很好的文章&#xff1a; Maven安裝與配置&…

java 根據前端傳回的png圖片數組,后端加水印加密碼生成pdf,返回給前端

前端傳回的png圖片數組&#xff0c;后端加水印加密碼生成pdf&#xff0c;返回給前端 場景&#xff1a;重點&#xff1a;maven依賴controllerservice 場景&#xff1a; 當前需求&#xff0c;前端通過html2canvas將頁面報表生成圖片下載&#xff0c;可以仍然不滿意。 需要java后…

數據分庫分表和遷移方案

在我們業務快速發展的過程中&#xff0c;數據量必然也會迎來突飛猛漲。那么當我們的數據量百倍、千倍、萬倍、億倍增長后&#xff0c;原有的單表性能就不能滿足我們日常的查詢和寫入了&#xff0c;此時數據架構就不得不進行拆分&#xff0c;比如單表拆分成10張表、100張表、單個…

線上突發:MySQL 自增 ID 用完,怎么辦?

線上突發&#xff1a;MySQL 自增 ID 用完&#xff0c;怎么辦&#xff1f; 1. 問題背景2. 場景復現3. 自增id用完怎么辦&#xff1f;4. 總結 1. 問題背景 最近&#xff0c;我們在數據庫巡檢的時候發現了一個問題&#xff1a;線上的地址表自增主鍵用的是int類型。隨著業務越做越…

[Java] Solon 框架的三大核心組件之一插件擴展體系

1、Solon 的三大核心組件 核心組件說明Plugin 插件擴展機制提供“編碼風格”的擴展體系Ioc/Aop 應用容器提供基于注入依賴的自動裝配體系ContextHandler 通用上下文處理接口提供“開放式處理”適配體系&#xff08;俗稱&#xff0c;三元合一&#xff09; 2、Solon Plugin 插件…

TRELLIS微軟的圖生3D

TRELLIS 教程目錄&#xff1a; Youtube&#xff1a;https://www.youtube.com/watch?vJqFHZ-dRMhI 官網地址&#xff1a;https://trellis3d.github.io/ GitHub&#xff1a;https://github.com/Microsoft/TRELLIS 部署目錄&#xff1a; 克隆項目 git clone --recurse-submodul…

Java導出通過Word模板導出docx文件并通過QQ郵箱發送

一、創建Word模板 {{company}}{{Date}}服務器運行情況報告一、服務器&#xff1a;總告警次數&#xff1a;{{ServerTotal}} 服務器IP:{{IPA}}&#xff0c;總共告警次數:{{ServerATotal}} 服務器IP:{{IPB}}&#xff0c;總共告警次數:{{ServerBTotal}} 服務器IP:{{IPC}}&#x…

【22】Word:小李-高新技術企業政策?

目錄 題目? NO1.2 NO3 NO4 NO5.6 NO7.8 NO9.10 若文章中存在刪除空白行等要求&#xff0c;可以到最后來完成。注意最后一定要檢查此部分&#xff01;注意&#xff1a;大多是和事例一樣即可&#xff0c;不用一摸一樣&#xff0c;但也不要差太多。 題目 NO1.2 F12Fn&a…