Python FastAPI + Celery + RabbitMQ 分布式圖片水印處理系統

  1. FastAPI 服務器
  2. Celery 任務隊列
  3. RabbitMQ 作為消息代理
  4. 定時任務處理

首先創建項目結構:

c:\Users\Administrator\Desktop\meitu\
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── celery_app.py
│   ├── tasks.py
│   └── config.py
├── requirements.txt
└── celery_worker.py
  1. 首先創建 requirements.txt:
fastapi==0.104.1
uvicorn==0.24.0
celery==5.3.4
python-dotenv==1.0.0
requests==2.31.0
  1. 創建配置文件:
from dotenv import load_dotenv
import osload_dotenv()# RabbitMQ配置
RABBITMQ_HOST = os.getenv("RABBITMQ_HOST", "localhost")
RABBITMQ_PORT = os.getenv("RABBITMQ_PORT", "5672")
RABBITMQ_USER = os.getenv("RABBITMQ_USER", "guest")
RABBITMQ_PASS = os.getenv("RABBITMQ_PASS", "guest")# Celery配置
CELERY_BROKER_URL = f"amqp://{RABBITMQ_USER}:{RABBITMQ_PASS}@{RABBITMQ_HOST}:{RABBITMQ_PORT}//"
CELERY_RESULT_BACKEND = "rpc://"# 定時任務配置
CELERY_BEAT_SCHEDULE = {'process-images-every-hour': {'task': 'app.tasks.process_images','schedule': 3600.0,  # 每小時執行一次},'daily-cleanup': {'task': 'app.tasks.cleanup_old_images','schedule': 86400.0,  # 每天執行一次}
}
  1. 創建 Celery 應用:
from celery import Celery
from app.config import CELERY_BROKER_URL, CELERY_RESULT_BACKEND, CELERY_BEAT_SCHEDULEcelery_app = Celery('image_processing',broker=CELERY_BROKER_URL,backend=CELERY_RESULT_BACKEND,include=['app.tasks']
)# 配置定時任務
celery_app.conf.beat_schedule = CELERY_BEAT_SCHEDULE
celery_app.conf.timezone = 'Asia/Shanghai'
  1. 創建任務文件:
from app.celery_app import celery_app
from app.watermark import ImageWatermarker
import os
from datetime import datetime, timedelta@celery_app.task
def add_watermark_task(image_path, text, position='center'):"""異步添加水印任務"""watermarker = ImageWatermarker()try:result_path = watermarker.add_watermark(image_path=image_path,text=text,position=position)return {"status": "success", "output_path": result_path}except Exception as e:return {"status": "error", "message": str(e)}@celery_app.task
def process_images():"""定時處理圖片任務"""image_dir = "images/pending"if not os.path.exists(image_dir):return {"status": "error", "message": "Pending directory not found"}processed = 0for image in os.listdir(image_dir):if image.lower().endswith(('.png', '.jpg', '.jpeg')):add_watermark_task.delay(os.path.join(image_dir, image),"自動處理水印",'center')processed += 1return {"status": "success", "processed": processed}@celery_app.task
def cleanup_old_images():"""清理舊圖片任務"""output_dir = "images/processed"if not os.path.exists(output_dir):return {"status": "error", "message": "Output directory not found"}threshold_date = datetime.now() - timedelta(days=7)cleaned = 0for image in os.listdir(output_dir):image_path = os.path.join(output_dir, image)if os.path.getctime(image_path) < threshold_date.timestamp():os.remove(image_path)cleaned += 1return {"status": "success", "cleaned": cleaned}
  1. 創建 FastAPI 應用:
from fastapi import FastAPI, File, UploadFile, BackgroundTasks
from fastapi.responses import JSONResponse
import os
from app.tasks import add_watermark_task
from app.celery_app import celery_appapp = FastAPI(title="圖片水印處理服務")@app.post("/upload/")
async def upload_image(file: UploadFile = File(...),text: str = "水印文本",position: str = "center"
):# 保存上傳的文件file_path = f"images/uploads/{file.filename}"os.makedirs(os.path.dirname(file_path), exist_ok=True)with open(file_path, "wb") as buffer:content = await file.read()buffer.write(content)# 創建異步任務task = add_watermark_task.delay(file_path, text, position)return JSONResponse({"status": "success","message": "圖片已上傳并加入處理隊列","task_id": task.id})@app.get("/task/{task_id}")
async def get_task_status(task_id: str):task = celery_app.AsyncResult(task_id)if task.ready():return {"status": "completed", "result": task.result}return {"status": "processing"}@app.get("/tasks/scheduled")
async def get_scheduled_tasks():return {"tasks": celery_app.conf.beat_schedule}
  1. 創建 Celery worker 啟動文件:
from app.celery_app import celery_appif __name__ == '__main__':celery_app.start()

使用說明:

  1. 首先安裝依賴:
pip install -r requirements.txt
  1. 確保 RabbitMQ 服務已啟動

  2. 啟動 FastAPI 服務器:

uvicorn app.main:app --reload
  1. 啟動 Celery worker:
celery -A celery_worker.celery_app worker --loglevel=info
  1. 啟動 Celery beat(定時任務):
celery -A celery_worker.celery_app beat --loglevel=info

這個系統提供以下功能:

  1. 通過 FastAPI 接口上傳圖片并異步處理水印
  2. 使用 Celery 處理異步任務隊列
  3. 使用 RabbitMQ 作為消息代理
  4. 支持定時任務:
    • 每小時自動處理待處理圖片
    • 每天清理一周前的舊圖片
  5. 支持任務狀態查詢
  6. 支持查看計劃任務列表

API 端點:

  • POST /upload/ - 上傳圖片并創建水印任務
  • GET /task/{task_id} - 查詢任務狀態
  • GET /tasks/scheduled - 查看計劃任務列表

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/74176.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/74176.shtml
英文地址,請注明出處:http://en.pswp.cn/web/74176.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【藍橋杯】每日練習 Day18

目錄 前言 動態求連續區間和 分析 代碼 數星星 分析 代碼 星空之夜 分析 代碼 前言 接下來是今天的題目&#xff08;本來是有四道題的但是有一道題是前面講過&#xff08;逆序數的&#xff0c;感興趣的小伙伴可以去看我歸并排序的那一篇&#xff09;的我就不再過多贅…

基于銀河麒麟桌面服務器操作系統的 DeepSeek本地化部署方法【詳細自用版】

一、3種方式使用DeepSeek 1.本地部署 服務器操作系統環境進行,具體流程如下(桌面環境步驟相同): 本例所使用銀河麒麟高級服務器操作系統版本信息: (1)安裝ollama 方式一:按照ollama官網的下載指南,執行如下命令: curl -fsSL https://ollama.com/install.sh | sh方…

Python入門(7):Python序列結構-字典

字典Dictionary 字典(dictionary)和列表類似&#xff0c;也是可變序列&#xff0c;不過與列表不同&#xff0c;它是無序的可變序列&#xff0c;保存的為容是以“鍵-值對”的形式存放的。 Python 中的字典相當于 Java 或者 C中的 Map 對象。在C#中,就是Dictionary<TKey,TVa…

Flutter項目之構建打包分析

目錄&#xff1a; 1、準備部分2、構建Android包2.1、配置修改部分2.2、編譯打包 3、構建ios包3.1、配置修改部分3.2、編譯打包 1、準備部分 2、構建Android包 2.1、配置修改部分 2.2、編譯打包 執行flutter build apk命令進行打包。 3、構建ios包 3.1、配置修改部分 3.2、編譯…

不用再付費~全網書源一鍵下載,實現閱讀自由!!!

現在市面上有許多免費你看書的軟件&#xff0c;但都軟件內太多廣告彈窗&#xff0c;這無疑是很煩&#xff0c;有事一不小心點進去就下載了軟件&#xff0c;簡直讓人頭大&#xff01; 如果你遇到這樣的難題那么就應該看下本文~ 這是一款能一鍵將在線連載小說整合下載成標準格式&…

GCC RISCV 后端 -- GIMPLE IR 表示的一些理解

C/C源代碼經過 GCC 解析&#xff08;Parse&#xff09;及轉換后&#xff0c;通過 GIMPLE IR 予以表示&#xff08;Representation&#xff09;。其中&#xff0c;一個C/C源文件&#xff0c;通過 宏處理后&#xff0c;形成一個 轉譯單元&#xff08;Translation Unit&#xff09…

JAVA設計模式之適配器模式《太白金星有點煩》

太白金星握著月光凝成的鼠標&#xff0c;第108次檢查南天門服務器的運行日志。這個剛從天樞院調來的三等仙官&#xff0c;此刻正盯著瑤池主機房里的青銅鼎發愁——鼎身上"天地同壽"的云紋間&#xff0c;漂浮著三界香火系統每分鐘吞吐的十萬條功德數據。看著居高不下的…

以太坊DApp開發腳手架:Scaffold-ETH 2 詳細介紹與搭建教程

一、什么是Scaffold-ETH 2 Scaffold-ETH 2是一個開源的最新工具包&#xff0c;類似于腳手架。用于在以太坊區塊鏈上構建去中心化應用程序 &#xff08;DApp&#xff09;。它旨在使開發人員更容易創建和部署智能合約&#xff0c;并構建與這些合約交互的用戶界面。 Scaffold-ETH…

畢業設計:實現一個基于Python、Flask和OpenCV的人臉打卡Web系統(六)

畢業設計:實現一個基于Python、Flask和OpenCV的人臉打卡Web系統(六) Flask Flask是一個使用 Python 編寫的輕量級 Web 應用框架。其 WSGI 工具箱采用 Werkzeug ,模板引擎則使用 Jinja2 。Flask使用 BSD 授權。 Flask也被稱為 “microframework” ,因為它使用簡單的核心,…

第十一章 VGA顯示圖片(還不會)

FPGA至簡設計實例 前言 一、項目背景 1. IP核概述 IP 核(Intellectual Property core)指的是知識產權核或知識產權模塊,其是具有特定電路功能的硬件描述語言程序,在EDA技術開發中具有十分重要的地位。美國著名的Dataquest咨詢公司將 半導體產業的IP定義為“用于ASIC或FPGA…

浙江大學公開課|第二季|從大模型、智能體到復雜AI應用系統的構建——以產業大腦為例

大家好&#xff0c;我是吾鳴。 前沿回顧 吾鳴之前給大家分享過浙江大學DeepSeek系列公開課第一季&#xff0c;第一季一共八講&#xff0c;內容介紹豐富&#xff0c;內容之廣&#xff0c;看完粉絲朋友直呼浙大良心。這八講公開課名稱分別是&#xff1a; 第一期&#xff08;上&…

Spring AOP中為所有類型通知傳遞參數的完整示例,包含詳細注釋和參數傳遞方式

以下是Spring AOP中為所有類型通知傳遞參數的完整示例&#xff0c;包含詳細注釋和參數傳遞方式&#xff1a; // 1. 目標類&#xff08;被增強的類&#xff09; package com.example;public class TargetService {public void doTask(String param) {System.out.println("…

【Git教程】將dev分支合并到master后,那么dev分支該如何處理

將 dev 合并到 master 后的分支狀態與操作指南 1. 合并后的分支狀態 dev 分支不會消失&#xff1a; Git 的 git merge 命令僅將 dev 的內容合并到 master&#xff0c;不會刪除 dev 分支。合并后&#xff0c;dev 分支仍然存在&#xff0c;其歷史記錄和代碼保持不變。 分支的 H…

【go】異常處理panic和recover

panic 和 recover 當然能觸發程序宕機退出的&#xff0c;也可以是我們自己&#xff0c;比如經過檢查判斷&#xff0c;當前環境無法達到我們程序進行的預期條件時&#xff08;比如一個服務指定監聽端口被其他程序占用&#xff09;&#xff0c;可以手動觸發 panic&#xff0c;讓…

CSS層疊順序

介紹 在 CSS 中&#xff0c;元素的層疊順序決定了當多個元素重疊時&#xff08;跟布局沒有完全的關系&#xff0c;也就是說層疊順序只會在幾個疊放元素上進行比較&#xff0c;而不會改變布局&#xff09;&#xff0c;哪個元素顯示在最上面&#xff0c;哪個元素顯示在最下面。 …

數制——FPGA

1、定點數 定點數的三種表示方式&#xff1a; 原碼&#xff1a;符號位 絕對值 表示方法 反碼&#xff1a;正數的反碼表示 與原碼表示一致&#xff0c;負數的反碼表示 除符號位&#xff0c;其他位全都取反 補碼&#xff1a;正數的補碼表示 與原碼表示一致&#xff0c;負數的補碼…

在用redis當中可能遇到的問題解決方案以及redis中的一些名詞解釋

在用redis當中可能遇到的問題解決方案以及redis中的一些名詞解釋 Redis篇一、緩存穿透&#xff1a;解決方案&#xff1a;緩存空數據布隆過濾器 二、緩存擊穿解決方案互斥鎖&#xff0c;強一致性&#xff0c;性能差&#xff0c;速度慢邏輯過期&#xff0c;數據不同步&#xff0c…

一文詳解QT環境搭建:Windows使用CLion配置QT開發環境

在當今的軟件開發領域&#xff0c;跨平臺應用的需求日益增長&#xff0c;Qt作為一款流行的C圖形用戶界面庫&#xff0c;因其強大的功能和易用性而備受開發者青睞。與此同時&#xff0c;CLion作為一款專為C/C打造的強大IDE&#xff0c;提供了豐富的特性和高效的編碼體驗。本文將…

【區塊鏈安全 | 第二十四篇】單位和全局可用變量(二)

文章目錄 單位和全局可用變量&#xff08;Units and Globally Available Variables&#xff09;特殊變量和函數1. 區塊和交易屬性2. ABI 編碼和解碼函數3. bytes 成員函數4. string 成員函數5. 錯誤處理6. 數學和加密函數7. 地址類型成員函數8. 與合約相關9. 類型信息 單位和全…

一種監控錄像視頻恢復的高效解決方案,從每一幀中尋找可能性

該軟件旨在恢復從監控設備中刪除或丟失的視頻。該程序經過調整以處理大多數流行供應商的閉路電視系統中使用的專有格式&#xff0c;并通過智能重建引擎進行了增強&#xff0c;能夠為監控記錄提供任何通用解決方案都無法實現的恢復結果。如果不需要持續使用該軟件&#xff0c;則…