架構概述
基于微服務的身份認證架構采用OAuth 2.0/OpenID Connect協議,OpenIddict作為認證服務器,FastAPI作為資源服務器。系統包含三個核心組件:認證服務、API網關和業務微服務。OpenIddict負責頒發令牌,FastAPI通過JWT驗證訪問權限。
技術棧選型
- 認證服務器: OpenIddict 4.8(基于ASP.NET Core)
- 資源服務器: FastAPI 0.95+(Python 3.10+)
- 令牌格式: JWT with RSA256簽名
- 數據庫: PostgreSQL 14用于存儲客戶端和令牌數據
- 消息隊列: Redis 7用于授權碼流處理
認證服務設計
OpenIddict配置需啟用授權碼、客戶端憑證和刷新令牌流程。關鍵配置項包括:
services.AddOpenIddict().AddCore(options => options.UseEntityFrameworkCore().UseDbContext<AuthDbContext>()).AddServer(options => {options.SetTokenEndpointUris("/connect/token");options.SetAuthorizationEndpointUris("/connect/authorize");options.AllowAuthorizationCodeFlow().AllowClientCredentialsFlow();options.RegisterScopes("api1", "offline_access");options.AddDevelopmentEncryptionCertificate().AddDevelopmentSigningCertificate();}).AddValidation(options => options.UseLocalServer());
FastAPI 資源服務器實現
安裝必需依賴:
pip install fastapi uvicorn python-jose[cryptography] passlib bcrypt
JWT驗證中間件示例:
from fastapi import Depends, HTTPException
from fastapi.security import OAuth2AuthorizationCodeBearer
from jose import JWTError, jwtoauth2_scheme = OAuth2AuthorizationCodeBearer(authorizationUrl="http://auth-server/connect/authorize",tokenUrl="http://auth-server/connect/token"
)async def verify_token(token: str = Depends(oauth2_scheme)):try:payload = jwt.decode(token,key=public_key, # 從認證服務獲取的公鑰algorithms=["RS256"],audience="api1")return payloadexcept JWTError:raise HTTPException(status_code=401, detail="Invalid token")
微服務間通信安全
服務到服務通信采用客戶端憑證流程,需在OpenIddict注冊機器客戶端:
{"client_id": "service_account","client_secret": "your-secret","grant_types": ["client_credentials"],"scopes": ["api1"]
}
Python客戶端實現示例:
import httpxasync def get_service_token():async with httpx.AsyncClient() as client:response = await client.post("http://auth-server/connect/token",data={"grant_type": "client_credentials","client_id": "service_account","client_secret": "your-secret","scope": "api1"})return response.json()["access_token"]
權限控制設計
基于RBAC模型實現細粒度權限控制,OpenIddict scope與角色權限映射示例:
from fastapi import Securitydef require_permission(permission: str):async def checker(payload: dict = Security(verify_token)):if permission not in payload.get("scope", "").split():raise HTTPException(status_code=403, detail="Insufficient permissions")return payloadreturn checker# 使用示例
@app.get("/admin")
async def admin_route(user: dict = Depends(require_permission("admin"))):return {"message": "Admin access granted"}
性能優化策略
- 令牌緩存: Redis緩存已驗證的JWT,減少重復驗證開銷
- 公鑰輪換: JWKS端點實現自動密鑰輪換
- 批量驗證: 對微服務批量請求采用令牌哈希校驗
Redis緩存實現示例:
from redis import asyncio as aioredisasync def cached_verify_token(token: str):redis = aioredis.from_url("redis://localhost")cache_key = f"token:{hashlib.sha256(token.encode()).hexdigest()}"if await redis.exists(cache_key):return json.loads(await redis.get(cache_key))payload = await verify_token(token)await redis.setex(cache_key, 3600, json.dumps(payload))return payload
安全增強措施
- 令牌綁定: 實現DPoP機制防止令牌重放
- 審計日志: 記錄所有令牌頒發和驗證事件
- 令牌撤銷: 實時檢查Redis黑名單
部署架構
建議采用容器化部署方案:
# Auth服務
FROM mcr.microsoft.com/dotnet/sdk:7.0 AS auth-server
EXPOSE 5000
COPY --from=builder /app .
ENTRYPOINT ["dotnet", "AuthServer.dll"]# FastAPI服務
FROM python:3.10-slim
EXPOSE 8000
COPY ./app /app
RUN pip install -r /app/requirements.txt
CMD ["uvicorn", "main:app", "--host", "0.0.0.0"]
監控指標
關鍵監控項包括:
- 認證服務QPS和響應時間
- JWT驗證失敗率
- 令牌緩存命中率
- 權限檢查延遲
Prometheus監控示例配置:
scrape_configs:- job_name: 'auth'metrics_path: '/metrics'static_configs:- targets: ['auth-server:5000']- job_name: 'api'static_configs:- targets: ['api-service:8000']