基于celery的任務管理,本文主要是處理自己的算法,暴露API,管理任務并發,多線程

基于celery的任務管理,本文主要是處理自己的算法,暴露API,管理任務并發,多線程

  • 基本需求描述
  • 潛在問題
  • 主函數
  • 配置文件

基本需求描述

  • 暴露API,供其他人調用算法。
  • 方便查看任務狀態。
  • 因為服務器資源有限,控制并發數量。
  • 多任務并發加快處理速度。這里需要說明的是python本身是可以做多線程的,但是(1)直接使用threading,GIL的存在導致并不是多線程處理,實際上并發還是一個CPU核在處理;(2)可以使用multiprocessing來實現多線程,但是這里有個問題就是自己如果按照我之前的博客來通過數組來實現一個任務隊列的話,必然會涉及到全局變量問題,這里全局變量是可以實現的,但是不夠優雅,因為有很多現成的庫具備更加強大的功能,所以如果不是離線環境等受多種客觀環境限制,建議還是使用現成的即可,本文這里所以使用了celery+redis的方式來管理任務隊列。
  • 管理任務隊列,某個任務失敗后自動再次嘗試,如果超過嘗試次數閾值則該任務將認為失敗,不會再嘗試處理。

潛在問題

  • 因為是針對我自己的需求,CPU每個核處理一個任務。這意味著如果你想幾個任務多個CPU Core來處理的話,可以考慮算法中multiprocessing或者其他更優雅的方式,只是我在本項目中不需要考慮這個問題。

主函數

import time
import json
import redis
import logging
from flask import Flask, request, jsonify
from flask_cors import CORS
from celery import Celery
from config import CELERY_BROKER_URL, CELERY_RESULT_BACKEND, REDIS_HOST, REDIS_PORT, REDIS_DB, OUTPUT_DIR, CELERY_WORKER_CONCURRENCY, CELERY_TASK_MAX_RETRIES, CELERY_TASK_RETRY_DELAY# your algorithms
from simulator import rainfall_simulation_gpu, rainfall_simulation_cpu# Flask
app = Flask(__name__)
CORS(app)app.config.update(CELERY_BROKER_URL=CELERY_BROKER_URL,CELERY_RESULT_BACKEND=CELERY_RESULT_BACKEND,JSON_AS_ASCII=False,
)celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(result_backend=app.config['CELERY_RESULT_BACKEND'], worker_concurrency=CELERY_WORKER_CONCURRENCY)redis_client = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)@celery.task(bind=True, max_retries=CELERY_TASK_MAX_RETRIES, default_retry_delay=CELERY_TASK_RETRY_DELAY)
def long_running_task(self, task_info):unique_key = f"task:{hash(str(task_info))}"try:if not redis_client.set(unique_key, self.request.id, nx=True, ex=3600):return {'status': 'Duplicate task, skipping execution'}logger.info(f"Processing task: {task_info}")if task_info.get('GPU') == 1:simulation_status, save_path = rainfall_simulation_gpu(task_info, ngpus=1, output_dir=OUTPUT_DIR)else:simulation_status, save_path = rainfall_simulation_cpu(task_info, output_dir=OUTPUT_DIR)if simulation_status:return {'status': 'Task completed!', 'save_path': save_path}raise Exception('Simulation failed')except Exception as exc:logger.error(f"Error in task: {exc}")if self.request.retries >= self.max_retries:redis_client.delete(unique_key)raise self.update_state(state='FAILURE', meta={'exc': str(exc)})raise self.retry(exc=exc)@app.route('/new_task_rainfall_simulation/', methods=['POST'])
def new_task_rainfall_simulation():task_info = json.loads(request.data.decode('utf-8'))task = long_running_task.apply_async(args=[task_info])return jsonify({'task_id': task.id}), 202@app.route('/task-status/<task_id>', methods=['GET'])
def task_status(task_id):task = long_running_task.AsyncResult(task_id)response = {'state': task.state, 'result': task.info if task.state in ['SUCCESS', 'FAILURE'] else None}return jsonify(response)if __name__ == '__main__':app.run(host='your ip', port=5000) // set your server IP and port

配置文件

config.py

# Redis configuration
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_DB = 0# Celery configuration
CELERY_BROKER_URL = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'
CELERY_RESULT_BACKEND = f'redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}'# General application settings
JSON_AS_ASCII = False# Simulation configuration
OUTPUT_DIR = 'xxx' //Save simulation results# Celery worker configuration
CELERY_WORKER_CONCURRENCY = 6 // concurrent users, depends on your hardware and algorithms
CELERY_TASK_MAX_RETRIES = 2 // define retry times for each task
CELERY_TASK_RETRY_DELAY = 10 // retry the task after n seconds

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/web/66027.shtml
繁體地址,請注明出處:http://hk.pswp.cn/web/66027.shtml
英文地址,請注明出處:http://en.pswp.cn/web/66027.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Java(7)常用的工具類

1.Collections集合工具類 內置了大量對集合操作的靜態方法&#xff0c;可以通過類名直接調用方法。 方法的種類&#xff1a;最大值max、最小值min、sort排序...詳見API幫助文檔 import java.util.ArrayList; import java.util.Collections; import java.util.List;public cl…

【Varnish】:解決 Varnish 7.6 CDN 靜態資源緩存失效問題

項目場景&#xff1a; 在一個使用Varnish作為反向代理的Web應用中&#xff0c;我們依賴CDN&#xff08;內容分發網絡&#xff09;來緩存靜態資源&#xff08;如圖片、CSS、JavaScript文件等&#xff09;&#xff0c;以提高全球用戶的訪問速度并減輕源站服務器的負載。然而&…

理解機器學習中的參數和超參數

在機器學習中&#xff0c;參數和超參數是兩個重要但不同的概念&#xff0c;它們共同影響模型的性能和表現。以下是它們的定義和區別&#xff0c;以及如何通俗地理解它們&#xff1a; 1. 參數 定義 參數是模型在訓練過程中自動學習到的變量&#xff0c;它們直接決定了模型如何…

Win11右鍵菜單實現

主要參考Win11 Context Menu Demo 此工程是vs2022編譯&#xff0c;vs2019先修改下 base.h 方可編譯過 編譯好dll以后 拷貝至SparsePackage目錄下 生成稀疏包msix 就拿他工程里面的改&#xff0c;編輯AppxManifest.xml&#xff0c;配置都要對&#xff0c;一個不對可能都失敗&a…

R.swift庫的詳細用法

R.swift 是一個 Swift 工具庫,它提供了一個自動生成的類 R,使得你可以通過類型安全的方式訪問項目中的資源,例如圖片、字體、顏色、XIB 文件等。通過 R.swift,你可以避免字符串類型的錯誤,提升代碼的可維護性。 以下是 R.swift 庫的詳細用法: 1. 安裝 R.swift 使用 Sw…

像JSONDecodeError: Extra data: line 2 column 1 (char 134)這樣的問題怎么解決

問題介紹 今天處理返回的 JSON 的時候&#xff0c;出現了下面這樣的問題&#xff1a; 處理這種問題的時候&#xff0c;首先你要看一下當前的字符串格式是啥樣的&#xff0c;比如我查看后發現是下面這樣的&#xff1a; 會發現這個字符串中間沒有逗號&#xff0c;也就是此時的J…

what?ngify 比 axios 更好用,更強大?

文章目錄 前言一、什么是ngify&#xff1f;二、npm安裝三、發起請求3.1 獲取 JSON 數據3.2 獲取其他類型的數據3.3 改變服務器狀態3.4 設置 URL 參數3.5 設置請求標頭3.6 與服務器響應事件交互3.7 接收原始進度事件3.8 處理請求失敗3.9 Http Observables 四、更換 HTTP 請求實現…

Linux Kernel 之十 詳解 PREEMPT_RT、Xenomai 的架構、源碼、構建及使用

概述 現在的 RTOS 基本可以分為 Linux 陣營和非 Linux 陣營這兩大陣營。非 Linux 陣營的各大 RTOS 都是獨立發展,使用上也相對獨立;而 Linux 陣營則有多種不同的實現方法來改造 Linux 以實現實時性要求。本文我們重點關注 Linux 陣營的實時內核實現方法! 本文我們重點關注 …

【拒絕算法PUA】3065. 超過閾值的最少操作數 I

系列文章目錄 【拒絕算法PUA】0x00-位運算 【拒絕算法PUA】0x01- 區間比較技巧 【拒絕算法PUA】0x02- 區間合并技巧 【拒絕算法PUA】0x03 - LeetCode 排序類型刷題 【拒絕算法PUA】LeetCode每日一題系列刷題匯總-2025年持續刷新中 C刷題技巧總結&#xff1a; [溫習C/C]0x04 刷…

ClickHouse-CPU、內存參數設置

常見配置 1. CPU資源 1、clickhouse服務端的配置在config.xml文件中 config.xml文件是服務端的配置&#xff0c;在config.xml文件中指向users.xml文件&#xff0c;相關的配置信息實際是在users.xml文件中的。大部分的配置信息在users.xml文件中&#xff0c;如果在users.xml文…

《自動駕駛與機器人中的SLAM技術》ch9:自動駕駛車輛的離線地圖構建

目錄 1 點云建圖的流程 2 前端實現 2.1 前端流程 2.2 前端結果 3 后端位姿圖優化與異常值剔除 3.1 兩階段優化流程 3.2 優化結果 ① 第一階段優化結果 ② 第二階段優化結果 4 回環檢測 4.1 回環檢測流程 ① 遍歷第一階段優化軌跡中的關鍵幀。 ② 并發計算候選回環對…

type 屬性的用途和實現方式(圖標,表單,數據可視化,自定義組件)

1.圖標類型 <uni-icon>組件中&#xff0c;type可以用來指定圖標的不同樣式。 <uni-icons type"circle" size"30" color"#007aff"></uni-icons> //表示圓形 <uni-icons type"square" size"30" co…

網絡基礎知識指南|1-20個

1. IP地址: 即互聯網協議地址&#xff0c;是用于標識互聯網上的每一個設備或節點的唯一地址。IP地址的作用主要是進行網絡設備的定位和路由&#xff0c;確保數據包可以從源設備準確地傳送到目標設備。2. 子網掩碼: 是用于將一個IP地址劃分為網絡地址和主機地址的工具。它通常與…

GPT 系列論文精讀:從 GPT-1 到 GPT-4

學習 & 參考資料 前置文章 Transformer 論文精讀 機器學習 —— 李宏毅老師的 B 站搬運視頻 自監督式學習(四) - GPT的野望[DLHLP 2020] 來自獵人暗黑大陸的模型 GPT-3 論文逐段精讀 —— 沐神的論文精讀合集 GPT&#xff0c;GPT-2&#xff0c;GPT-3 論文精讀【論文精讀】…

lombok在高版本idea中注解不生效的解決

環境&#xff1a; IntelliJ IDEA 2024.3.1.1 Spring Boot Maven 問題描述 使用AllArgsConstructor注解一個用戶類&#xff0c;然后調用全參構造方法創建對象&#xff0c;出現錯誤&#xff1a; java: 無法將類 com.itheima.pojo.User中的構造器 User應用到給定類型; 需要:…

145.《redis原生超詳細使用》

文章目錄 什么是redisredis 安裝啟動redis數據類型redis key操作key 的增key 的查key 的改key 的刪key 是否存在key 查看所有key 「設置」過期時間key 「查看」過期時間key 「移除」過期時間key 「查看」數據類型key 「匹配」符合條件的keykey 「移動」到其他數據庫 redis數據類…

大數據技術Kafka詳解 ⑤ | Kafka中的CAP機制

目錄 1、分布式系統當中的CAP理論 1.1、CAP理論 1.2、Partitiontolerance 1.3、Consistency 1.4、Availability 2、Kafka中的CAP機制 C軟件異常排查從入門到精通系列教程&#xff08;核心精品專欄&#xff0c;訂閱量已達600多個&#xff0c;歡迎訂閱&#xff0c;持續更新…

riscv架構下linux4.15實現early打印

在高版本linux6.12.7源碼中&#xff0c;early console介紹&#xff0c;可參考《riscv架構下linux6.12.7實現early打印》文章。 1 什么是early打印 適配內核到新的平臺&#xff0c;基本環境搭建好之后&#xff0c;首要的就是要調通串口&#xff0c;方便后面的信息打印。 正常流…

improve-gantt-elastic(vue2中甘特圖實現與引入)

1.前言 項目開發中需要使用甘特圖展示項目實施進度&#xff0c;左側為表格計劃&#xff0c;右側為圖表進度展示。wl-gantt-mater&#xff0c;dhtmlx嘗試使用過可拓展性受到限制。gantt-elastic相對簡單&#xff0c;可操作性強&#xff0c;基礎版本免費。 甘特圖&#xff08;Gan…

力扣 全排列

回溯經典例題。 題目 通過回溯生成所有可能的排列。每次遞歸時&#xff0c;選擇一個數字&#xff0c;直到選滿所有數字&#xff0c;然后記錄當前排列&#xff0c;回到上層時移除最后選的數字并繼續選擇其他未選的數字。每次遞歸時&#xff0c;在 path 中添加一個新的數字&…