三十七、【高級特性篇】定時任務:基于 APScheduler 實現測試計劃的靈活調度
-
- 前言
-
- 準備工作
- 第一部分:后端實現 - `APScheduler` 集成與任務調度
-
- 1. 安裝 `django-apscheduler`
- 2. 配置 `django-apscheduler`
- 3. 數據庫遷移
- 4. 創建調度觸發函數
- 5. 啟動 APScheduler 調度器
- 6. 創建定時任務管理的 API
- 7. 后端初步測試
- 第二部分:前端實現 - 定時任務管理界面
-
- 1. 創建 API 服務 (`src/api/scheduler.ts`)
- 2. 添加定時任務路由和側邊欄入口
- 3. 實現定時任務列表頁面 (`src/views/system/ScheduledJobListView.vue`)
- 4. 實現定時任務創建/編輯對話框 (`src/views/system/ScheduledJobEditView.vue`)
- 第三部分:后端 `ScheduledJobSerializer` 增強 (以支持回顯觸發器配置)
- 第四部分:全面測試
- 總結
前言
定時任務是自動化測試平臺的核心功能之一,它允許我們設置測試計劃在預定的時間或周期自動執行,從而實現無人值守的自動化回歸測試、持續集成/部署后的冒煙測試等場景。
為什么選擇 APScheduler
和 django-apscheduler
?
APScheduler
(Advanced Python Scheduler): 一個輕量級且功能強大的 Python 任務調度庫。它支持多種觸發器(cron
模式、interval
模式、date
模式),非常靈活。django-apscheduler
:APScheduler
與 Django 的良好集成。它將 APScheduler 的調度信息(任務配置、下次運行時間等)直接存儲在 Django 的數據庫中,可以通過 Django ORM 來管理和查詢定時任務,也方便通過 Django Admin 或自定義界面進行配置。- 與 Celery 配合: 如果定時任務本身是一個耗時操作(如執行一個包含大量用例的測試計劃),直接在 APScheduler 的調度線程中執行會阻塞調度器,導致其他定時任務無法準時觸發,甚至出現問題。所以,讓 APScheduler 的定時任務只做一件輕量級的事情——將一個真正的耗時任務(即我們之前創建的 Celery 異步執行任務
execute_test_plan_task
)提交到 Celery 任務隊列中。 這樣,調度器的穩定性不受測試執行時間長短的影響,同時又能利用 Celery 的異步、分布式處理能力。
準備工作
- Django 后端項目就緒: 確保
test-platform/backend
項目結構完整,Celery 和 Redis 已配置并運行。 - Vue3 前端項目就緒。
- Axios 和 API 服務已封裝。
- Element Plus 集成完畢。
第一部分:后端實現 - APScheduler
集成與任務調度
1. 安裝 django-apscheduler
在你的 Django 項目的虛擬環境中運行:
pip install django-apscheduler
2. 配置 django-apscheduler
打開 test-platform/backend/settings.py
:
a. 添加到 INSTALLED_APPS
:
# test-platform/backend/settings.py
# ...
INSTALLED_APPS = [# ... 其他應用 ...'django_apscheduler', # 添加這一行# ...
]
# ...
# APScheduler 配置
SCHEDULER_CONFIG = {"apscheduler.jobstores": {"default": {"class": "django_apscheduler.jobstores:DjangoJobStore"}},"apscheduler.executors": {"default": {"class": "apscheduler.executors.pool:ThreadPoolExecutor","max_workers": "20"}},"apscheduler.job_defaults": {"coalesce": False,"max_instances": 3},"apscheduler.timezone": TIME_ZONE
}# django_apscheduler 配置
APSCHEDULER_DATETIME_FORMAT = "N j, Y, f:s a" # 默認時間格式
APSCHEDULER_RUN_NOW_TIMEOUT = 25 # 秒
# --- djangorestframework-simplejwt 設置 ---
b. 添加 APSCHEDULER
相關配置:
# test-platform/backend/settings.py
# ...
APSCHEDULER_DATETIME_FORMAT = "YYYY-MM-DD HH:mm:ss" # 日期時間格式
APSCHEDULER_RUN_NOW_TIMEOUT = 25 # 立即運行任務的超時時間(秒)
3. 數據庫遷移
運行 python manage.py migrate
,django-apscheduler
會自動在數據庫中創建管理調度任務所需的表。
4. 創建調度觸發函數
這個函數是 APScheduler
將要調度的目標。它會接收測試計劃ID,然后將真正的執行任務提交給 Celery。
a. 在 api
目錄下創建 scheduler_jobs.py
文件,填入以下代碼:
# test-platform/api/scheduler_jobs.py
import logging
from django.utils import timezone
from api.models import TestPlan, TestRun # 導入 TestPlan 和 TestRun 模型
from api.tasks import execute_test_plan_task # 導入 Celery 任務
from api.utils.log_utils import record_operation_log # 導入操作日志工具logger = logging.getLogger(__name__)def trigger_test_plan_execution_job(test_plan_id: int):"""APScheduler 定時任務觸發函數。此函數僅負責將測試計劃執行任務提交到 Celery 異步隊列。"""try:test_plan = TestPlan.objects.get(id=test_plan_id)# 1. 創建 TestRun 記錄,初始狀態為 PENDINGcurrent_time = timezone.now().strftime("%Y-%m-%d %H:%M:%S")initial_run_name = f"{test_plan.name} - (定時任務觸發) {current_time}"test_run = TestRun.objects.create(test_plan=test_plan,name=initial_run_name,description=f"定時任務觸發執行: {test_plan.name}",status='PENDING',total_cases=test_plan.test_cases.count() # 預估總數)# 2. 調用 Celery 任務異步執行task_result = execute_test_plan_task.delay(test_plan.id, str(test_run.id))# 3. 記錄操作日志record_operation_log(user=None, # 由調度器觸發,沒有直接用戶action_type='EXECUTE',target_resource='測試計劃',target_id=test_plan.id,description=f"定時任務觸發執行測試計劃: '{test_plan.name}' (ID: {test_plan.id}), TestRun ID: {test_run.id}, Celery Task ID: {task_result.id}",details={"trigger_type": "scheduled", "test_run_id": str(test_run.id), "celery_task_id": task_result.id})logger.info(f"定時任務成功提交測試計劃 (ID: {test_plan.id}) 執行到 Celery. TestRun ID: {test_run.id}, Celery Task ID: {task_result.id}")except TestPlan.DoesNotExist:logger.error(f"APScheduler 任務執行失敗: 測試計劃 (ID: {test_plan_id}) 未找到。")except Exception as e:logger.error(f"APScheduler 任務執行過程中發生未知錯誤 for plan ID {test_plan_id}: {e}", exc_info=True)
5. 啟動 APScheduler 調度器
django-apscheduler
提供了兩種啟動調度器的方式:
a. 在 Django 應用啟動時自動啟動 (不推薦用于生產環境):在 apps.py
中,但會導致開發服務器重載時重復啟動,并可能在多進程部署時引發問題。
b. 作為獨立的 Management Command 啟動:這是更健壯和推薦的方式,非常適合生產環境。
我們采用第二種方式。
a. 創建 Management Command 文件:
在 test-platform/api/management/commands/
目錄下創建 runapscheduler.py
文件,填入以下代碼:
# test-platform/api/management/commands/runapscheduler.py
import loggingfrom django.conf import settingsfrom apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.models import DjangoJobExecution
from django_apscheduler import utilfrom django.core.management.base import BaseCommand# 導入你的 APScheduler 定時任務函數
from api.scheduler_jobs import trigger_test_plan_execution_joblogger = logging.getLogger(__name__)# 定義清理舊作業執行記錄的函數
@util.close_old_connections
def delete_old_job_executions(max_age=604_800):"""刪除超過指定時間(默認7天)的舊作業執行記錄。這個函數本身也可以被 APScheduler 定時調度。"""DjangoJobExecution.objects.delete_old_job_executions(max_age)class Command(BaseCommand):help = "Runs APScheduler."def handle(self, *args, **options):# 創建一個后臺調度器實例scheduler = BackgroundScheduler(timezone=settings.TIME_ZONE)# 將 DjangoJobStore 添加到調度器scheduler.add_jobstore(DjangoJobStore(), "default")# 添加一個定時清理舊作業執行記錄的任務 (可選)# 這個任務本身由 APScheduler 調度,每12小時執行一次scheduler.add_job(delete_old_job_executions,trigger=IntervalTrigger(hours=12),id="delete_old_job_executions", # 指定一個 ID,方便管理max_instances=1, # 確保只有一個實例在運行replace_existing=True, # 如果已有同ID任務,則替換# misfire_grace_time=3600, # 如果任務錯過,延遲1小時內仍可執行)logger.info("Added job 'delete_old_job_executions'.")# 打印當前所有已注冊的 APScheduler 作業try:logger.info("Starting scheduler...")scheduler.start()except KeyboardInterrupt:logger.info("Stopping scheduler...")scheduler.shutdown()logger.info("Scheduler shut down successfully!")except Exception as e:logger.error(f"Scheduler startup failed: {e}", exc_info=True)scheduler.shutdown()
b. 運行調度器:
打開一個新的終端窗口 (除了 Django 開發服務器和 Celery Worker 的終端),激活虛擬環境,然后在 test-platform
目錄下運行:
python manage.py runapscheduler
如果調度器成功啟動,你會看到日志輸出,并提示添加了 delete_old_job_executions
任務。
6. 創建定時任務管理的 API
我們將為 django-apscheduler
的 DjangoJob
模型提供 RESTful API,以便前端進行管理。
a. 在 api/serializers.py
中添加 ScheduledJobSerializer
:
# test-platform/api/serializers.py
# ...
from django_apscheduler.models import DjangoJob, DjangoJobExecution # 導入 APScheduler 的模型class ScheduledJobSerializer(serializers.ModelSerializer):test_plan_name = serializers.SerializerMethodField(read_only=True)job_type = serializers.CharField(source='job_func_name', read_only=True) # 方便前端顯示函數名class Meta:model = DjangoJobfields = ['id', 'name', 'job_type', 'job_func_name', 'job_arguments', 'job_kwargs', 'job_state', 'next_run_time', 'start_date', 'end_date', 'max_instances', 'misfire_grace_time', 'coalesce', 'jobstore','test_plan_name' # 關聯的測試計劃名稱]read_only_fields = ['job_type', 'job_func_name', 'job_state', 'next_run_time']extra_kwargs = {'job_arguments': {'required': False, 'allow_null': True}, # 允許 job_arguments 為空'job_kwargs': {'required': False, 'allow_null': True}, # 允許 job_kwargs 為空'job_state': {'required': False, 'read_only': True}}def get_test_plan_name(self, obj: DjangoJob):"""從 job_arguments 中解析 test_plan_id 并獲取其名稱"""if obj.job_arguments:try:# job_arguments 是一個元組的 JSON 字符串,例如 '["1"]'args_list = json.loads(obj.job_arguments)if args_list and isinstance(args_list, list) and len(args_list) > 0:test_plan_id = args_list[0]if isinstance(test_plan_id, (int, str)):try:test_plan = TestPlan.objects.get(id=int(test_plan_id))return test_plan.nameexcept TestPlan.DoesNotExist:return f"未知計劃 (ID: {test_plan_id})"except (json.JSONDecodeError, IndexError, ValueError):passreturn "N/A"def create(self, validated_data: dict):# 在創建前,需要將 trigger_type 和 trigger_config 轉換為 APScheduler 的 trigger 參數# 這部分邏輯通常在 ViewSet 的 create 方法中處理return super().create(validated_data)def update