基于celery的任務管理,本文主要是處理自己的算法,暴露API,管理任務并發,多線程
- 基本需求描述
- 潛在問題
- 主函數
- 配置文件
基本需求描述
- 暴露API,供其他人調用算法。
- 方便查看任務狀態。
- 因為服務器資源有限,控制并發數量。
- 多任務并發加快處理速度。這里需要說明的是python本身是可以做多線程的,但是(1)直接使用threading,GIL的存在導致并不是多線程處理,實際上并發還是一個CPU核在處理;(2)可以使用multiprocessing來實現多線程,但是這里有個問題就是自己如果按照我之前的博客來通過數組來實現一個任務隊列的話,必然會涉及到全局變量問題,這里全局變量是可以實現的,但是不夠優雅,因為有很多現成的庫具備更加強大的功能,所以如果不是離線環境等受多種客觀環境限制,建議還是使用現成的即可,本文這里所以使用了celery+redis的方式來管理任務隊列。
- 管理任務隊列,某個任務失敗后自動再次嘗試,如果超過嘗試次數閾值則該任務將認為失敗,不會再嘗試處理。
潛在問題
- 因為是針對我自己的需求,CPU每個核處理一個任務。這意味著如果你想幾個任務多個CPU Core來處理的話,可以考慮算法中multiprocessing或者其他更優雅的方式,只是我在本項目中不需要考慮這個問題。
主函數
import time
import json
import redis
import logging
from flask import Flask, request, jsonify
from flask_cors import CORS
from celery import Celery
from config import CELERY_BROKER_URL, CELERY_RESULT_BACKEND, REDIS_HOST, REDIS_PORT, REDIS_DB, OUTPUT_DIR, CELERY_WORKER_CONCURRENCY, CELERY_TASK_MAX_RETRIES, CELERY_TASK_RETRY_DELAY# your algorithms
from simulator import rainfall_simulation_gpu, rainfall_simulation_cpu# Flask
app = Flask(__name__)
CORS(app)app.config.update(CELERY_BROKER_URL=CELERY_BROKER_URL,CELERY_RESULT_BACKEND=CELERY_RESULT_BACKEND,JSON_AS_ASCII=False,
)celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(result_backend=app.config['CELERY_RESULT_BACKEND'], worker_concurrency=CELERY_WORKER_CONCURRENCY)redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)@celery.task(bind=True, max_retries=CELERY_TASK_MAX_RETRIES, default_retry_delay=CELERY_TASK_RETRY_DELAY)
def long_running_task(self, task_info):unique_key = f"task:{hash(str(task_info))}"try:if not redis_client.set(unique_key, self.request.id, nx=True, ex=3600):return {'status': 'Duplicate task, skipping execution'}logger.info(f"Processing task: {task_info}")if task_info.get('GPU') == 1:simulation_status, save_path = rainfall_simulation_gpu(task_info, ngpus=1, output_dir=OUTPUT_DIR)else:simulation_status, save_path = rainfall_simulation_cpu(task_info, output_dir=OUTPUT_DIR)if simulation_status:return {'status': 'Task completed!', 'save_path': save_path}raise Exception('Simulation failed')except Exception as exc:logger.error(f"Error in task: {exc}")if self.request.retries >= self.max_retries:redis_client.delete(unique_key)raise self.update_state(state='FAILURE', meta={'exc': str(exc)})raise self.retry(exc=exc)@app.route('/new_task_rainfall_simulation/', methods=['POST'])
def new_task_rainfall_simulation():task_info = json.loads(request.data.decode('utf-8'))task = long_running_task.apply_async(args=[task_info])return jsonify({'task_id': task.id}), 202@app.route('/task-status/<task_id>', methods=['GET'])
def task_status(task_id):task = long_running_task.AsyncResult(task_id)response = {'state': task.state, 'result': task.info if task.state in ['SUCCESS', 'FAILURE'] else None}return jsonify(response)if __name__ == '__main__':app.run(host='your ip', port=5000) // set your server IP and port
配置文件
config.py
# Redis configuration
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0# Celery configuration
CELERY_BROKER_URL = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'
CELERY_RESULT_BACKEND = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'# General application settings
JSON_AS_ASCII = False# Simulation configuration
OUTPUT_DIR = 'xxx' //Save simulation results# Celery worker configuration
CELERY_WORKER_CONCURRENCY = 6 // concurrent users, depends on your hardware and algorithms
CELERY_TASK_MAX_RETRIES = 2 // define retry times for each task
CELERY_TASK_RETRY_DELAY = 10 // retry the task after n seconds