- FastAPI 服務器
- Celery 任務隊列
- RabbitMQ 作為消息代理
- 定時任務處理
首先創建項目結構:
c:\Users\Administrator\Desktop\meitu\
├── app/
│ ├── __init__.py
│ ├── main.py
│ ├── celery_app.py
│ ├── tasks.py
│ └── config.py
├── requirements.txt
└── celery_worker.py
- 首先創建 requirements.txt:
fastapi==0.104.1
uvicorn==0.24.0
celery==5.3.4
python-dotenv==1.0.0
requests==2.31.0
- 創建配置文件:
from dotenv import load_dotenv
import osload_dotenv()# RabbitMQ配置
RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "localhost")
RABBITMQ_PORT = os.getenv("RABBITMQ_PORT", "5672")
RABBITMQ_USER = os.getenv("RABBITMQ_USER", "guest")
RABBITMQ_PASS = os.getenv("RABBITMQ_PASS", "guest")# Celery配置
CELERY_BROKER_URL = f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}//"
CELERY_RESULT_BACKEND = "rpc://"# 定時任務配置
CELERY_BEAT_SCHEDULE = {'process-images-every-hour': {'task': 'app.tasks.process_images','schedule': 3600.0, # 每小時執行一次},'daily-cleanup': {'task': 'app.tasks.cleanup_old_images','schedule': 86400.0, # 每天執行一次}
}
- 創建 Celery 應用:
from celery import Celery
from app.config import CELERY_BROKER_URL, CELERY_RESULT_BACKEND, CELERY_BEAT_SCHEDULEcelery_app = Celery('image_processing',broker=CELERY_BROKER_URL,backend=CELERY_RESULT_BACKEND,include=['app.tasks']
)# 配置定時任務
celery_app.conf.beat_schedule = CELERY_BEAT_SCHEDULE
celery_app.conf.timezone = 'Asia/Shanghai'
- 創建任務文件:
from app.celery_app import celery_app
from app.watermark import ImageWatermarker
import os
from datetime import datetime, timedelta@celery_app.task
def add_watermark_task(image_path, text, position='center'):"""異步添加水印任務"""watermarker = ImageWatermarker()try:result_path = watermarker.add_watermark(image_path=image_path,text=text,position=position)return {"status": "success", "output_path": result_path}except Exception as e:return {"status": "error", "message": str(e)}@celery_app.task
def process_images():"""定時處理圖片任務"""image_dir = "images/pending"if not os.path.exists(image_dir):return {"status": "error", "message": "Pending directory not found"}processed = 0for image in os.listdir(image_dir):if image.lower().endswith(('.png', '.jpg', '.jpeg')):add_watermark_task.delay(os.path.join(image_dir, image),"自動處理水印",'center')processed += 1return {"status": "success", "processed": processed}@celery_app.task
def cleanup_old_images():"""清理舊圖片任務"""output_dir = "images/processed"if not os.path.exists(output_dir):return {"status": "error", "message": "Output directory not found"}threshold_date = datetime.now() - timedelta(days=7)cleaned = 0for image in os.listdir(output_dir):image_path = os.path.join(output_dir, image)if os.path.getctime(image_path) < threshold_date.timestamp():os.remove(image_path)cleaned += 1return {"status": "success", "cleaned": cleaned}
- 創建 FastAPI 應用:
from fastapi import FastAPI, File, UploadFile, BackgroundTasks
from fastapi.responses import JSONResponse
import os
from app.tasks import add_watermark_task
from app.celery_app import celery_appapp = FastAPI(title="圖片水印處理服務")@app.post("/upload/")
async def upload_image(file: UploadFile = File(...),text: str = "水印文本",position: str = "center"
):# 保存上傳的文件file_path = f"images/uploads/{file.filename}"os.makedirs(os.path.dirname(file_path), exist_ok=True)with open(file_path, "wb") as buffer:content = await file.read()buffer.write(content)# 創建異步任務task = add_watermark_task.delay(file_path, text, position)return JSONResponse({"status": "success","message": "圖片已上傳并加入處理隊列","task_id": task.id})@app.get("/task/{task_id}")
async def get_task_status(task_id: str):task = celery_app.AsyncResult(task_id)if task.ready():return {"status": "completed", "result": task.result}return {"status": "processing"}@app.get("/tasks/scheduled")
async def get_scheduled_tasks():return {"tasks": celery_app.conf.beat_schedule}
- 創建 Celery worker 啟動文件:
from app.celery_app import celery_appif __name__ == '__main__':celery_app.start()
使用說明:
- 首先安裝依賴:
pip install -r requirements.txt
-
確保 RabbitMQ 服務已啟動
-
啟動 FastAPI 服務器:
uvicorn app.main:app --reload
- 啟動 Celery worker:
celery -A celery_worker.celery_app worker --loglevel=info
- 啟動 Celery beat(定時任務):
celery -A celery_worker.celery_app beat --loglevel=info
這個系統提供以下功能:
- 通過 FastAPI 接口上傳圖片并異步處理水印
- 使用 Celery 處理異步任務隊列
- 使用 RabbitMQ 作為消息代理
- 支持定時任務:
- 每小時自動處理待處理圖片
- 每天清理一周前的舊圖片
- 支持任務狀態查詢
- 支持查看計劃任務列表
API 端點:
- POST /upload/ - 上傳圖片并創建水印任務
- GET /task/{task_id} - 查詢任務狀態
- GET /tasks/scheduled - 查看計劃任務列表