以下是一個基于FastAPI框架設計的日志模塊,結合SQLite數據庫實現增刪改查功能的完整實現方案:
1. 項目結構
your_project/
├── app/
│ ├── logs/ # 日志模塊目錄
│ │ ├── models.py # 數據庫模型定義
│ │ ├── schemas.py # Pydantic數據驗證模型
│ │ ├── crud.py # 數據庫操作邏輯
│ │ └── routes.py # API路由定義
│ ├── database.py # 數據庫連接配置
│ └── main.py # FastAPI主入口
├── logs.db # SQLite數據庫文件
└── requirements.txt
2. 核心代碼實現
2.1 數據庫配置 (app/database.py
)
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmakerSQLALCHEMY_DATABASE_URL = "sqlite:///./logs.db"engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)Base = declarative_base()def get_db():db = SessionLocal()try:yield dbfinally:db.close()
2.2 數據模型 (app/logs/models.py
)
from sqlalchemy import Column, Integer, String, DateTime
from datetime import datetime
from database import Baseclass Log(Base):__tablename__ = "logs"id = Column(Integer, primary_key=True, index=True)level = Column(String(10), index=True) # 日志級別 (INFO, WARNING, ERROR)message = Column(String(500))timestamp = Column(DateTime, default=datetime.utcnow)source = Column(String(50), default="application") # 日志來源
2.3 Pydantic模型 (app/logs/schemas.py
)
from pydantic import BaseModel
from datetime import datetimeclass LogCreate(BaseModel):level: strmessage: strsource: str = "application"class LogResponse(LogCreate):id: inttimestamp: datetimeclass Config:orm_mode = True
2.4 CRUD操作 (app/logs/crud.py
)
from sqlalchemy.orm import Session
from . import models, schemasdef create_log(db: Session, log: schemas.LogCreate):db_log = models.Log(**log.dict())db.add(db_log)db.commit()db.refresh(db_log)return db_logdef get_logs(db: Session, skip: int = 0, limit: int = 100):return db.query(models.Log).offset(skip).limit(limit).all()def get_log_by_id(db: Session, log_id: int):return db.query(models.Log).filter(models.Log.id == log_id).first()def update_log(db: Session, log_id: int, log: schemas.LogCreate):db_log = db.query(models.Log).filter(models.Log.id == log_id).first()if db_log:for key, value in log.dict().items():setattr(db_log, key, value)db.commit()db.refresh(db_log)return db_logdef delete_log(db: Session, log_id: int):db_log = db.query(models.Log).filter(models.Log.id == log_id).first()if db_log:db.delete(db_log)db.commit()return Truereturn False
2.5 API路由 (app/logs/routes.py
)
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from ..database import get_db
from . import schemas, crudrouter = APIRouter(prefix="/logs", tags=["logs"])@router.post("/", response_model=schemas.LogResponse)
def create_log_entry(log: schemas.LogCreate, db: Session = Depends(get_db)):return crud.create_log(db, log)@router.get("/", response_model=list[schemas.LogResponse])
def read_logs(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):return crud.get_logs(db, skip=skip, limit=limit)@router.get("/{log_id}", response_model=schemas.LogResponse)
def read_log(log_id: int, db: Session = Depends(get_db)):db_log = crud.get_log_by_id(db, log_id)if not db_log:raise HTTPException(status_code=404, detail="Log not found")return db_log@router.put("/{log_id}", response_model=schemas.LogResponse)
def update_log_entry(log_id: int, log: schemas.LogCreate, db: Session = Depends(get_db)):updated_log = crud.update_log(db, log_id, log)if not updated_log:raise HTTPException(status_code=404, detail="Log not found")return updated_log@router.delete("/{log_id}")
def delete_log_entry(log_id: int, db: Session = Depends(get_db)):success = crud.delete_log(db, log_id)if not success:raise HTTPException(status_code=404, detail="Log not found")return {"message": "Log deleted successfully"}
2.6 主入口 (app/main.py
)
from fastapi import FastAPI
from .database import engine
from .logs.models import Log
from .logs.routes import router as logs_routerLog.metadata.create_all(bind=engine)app = FastAPI()
app.include_router(logs_router)@app.get("/")
def root():return {"message": "Logging System API"}
3. 使用方式
3.1 啟動服務
uvicorn app.main:app --reload
3.2 API測試示例
# 創建日志
curl -X POST "http://localhost:8000/logs/" \
-H "Content-Type: application/json" \
-d '{"level": "ERROR", "message": "Database connection failed"}'# 查詢所有日志
curl "http://localhost:8000/logs/"# 更新日志
curl -X PUT "http://localhost:8000/logs/1" \
-H "Content-Type: application/json" \
-d '{"level": "WARNING", "message": "Connection timeout"}'# 刪除日志
curl -X DELETE "http://localhost:8000/logs/1"
4. 高級功能擴展建議
- 日志過濾:
# 在crud.py中添加
def filter_logs(db: Session, level: str = None, source: str = None):query = db.query(models.Log)if level:query = query.filter(models.Log.level == level)if source:query = query.filter(models.Log.source == source)return query.all()
- 自動記錄請求日志:
# 在main.py中添加中間件
@app.middleware("http")
async def log_requests(request: Request, call_next):start_time = time.time()response = await call_next(request)process_time = (time.time() - start_time) * 1000log_data = {"level": "INFO","message": f"{request.method} {request.url} - {response.status_code}","source": "http"}# 異步寫入日志(需配置數據庫會話)with SessionLocal() as db:crud.create_log(db, schemas.LogCreate(**log_data))return response
- 日志分頁查詢:
# 在routes.py中改進GET方法
@router.get("/", response_model=list[schemas.LogResponse])
def read_logs(page: int = 1,per_page: int = 20,db: Session = Depends(get_db)
):skip = (page - 1) * per_pagereturn crud.get_logs(db, skip=skip, limit=per_page)
該設計實現了完整的日志管理功能,同時保持了FastAPI的異步特性優勢。通過SQLAlchemy ORM層,可以輕松切換其他數據庫(如PostgreSQL/MySQL)。