在現代 Web 開發中,異步任務處理和用戶通知是兩個重要的功能。由于老舊測試平臺【測試用例生成平臺,源碼分享】進行智能化升級后,未采用異步任務處理,大模型推理時間較長,導致任務阻塞,無法處理其他任務,體驗較差。本文將以 Django 框架為基礎,結合 Celery 和 Redis,完成一個完整的異步任務系統,并實現用戶通知功能。我們還會對接阿里云百煉 DeepSeek-R1 模型,通過大模型生成測試用例,并提供數據的增刪改查功能。以下是詳細的實現步驟和完整代碼示例。
一、需求分析與功能拆解
1.1 功能需求
本項目的主要需求如下:
- 用戶通過 API 輸入任務名稱和字段信息,系統為任務生成唯一的 UUID,并提示用戶任務已創建。
- 異步任務系統調用阿里云百煉 DeepSeek-R1 模型,傳遞字段信息生成 JSON 格式的測試用例。
- 將模型生成的測試用例存儲到數據庫的測試用例預覽表中。
- 通知用戶任務完成,支持通過 WebSocket 或郵件通知。
- 提供一個頁面對生成的測試用例數據進行增刪改查操作。
1.2 技術選型
- Django:作為后端框架,負責處理 API 請求、任務狀態管理和數據庫操作。
- Celery:用于實現任務的異步處理。
- Redis:作為 Celery 的消息隊列。
- 阿里云百煉 DeepSeek-R1:調用大模型接口生成測試用例。
- Django Channels:實現 WebSocket 通知功能。
- 前端頁面:用于展示和管理測試用例數據。
二、項目創建與環境配置
2.1 安裝依賴
在開始開發之前,需要安裝以下依賴工具和庫:
pip install django celery redis requests channels
2.2 項目目錄結構
項目的目錄結構如下:
your_project_name/
├── your_project_name/
│ ├── __init__.py
│ ├── settings.py
│ ├── urls.py
│ ├── asgi.py
│ ├── celery.py
├── your_app_name/
│ ├── __init__.py
│ ├── models.py
│ ├── views.py
│ ├── tasks.py
│ ├── signals.py
│ ├── urls.py
├── manage.py
三、系統實現
3.1 配置 Celery 和 Redis
3.1.1 在 settings.py
中配置 Celery
# Celery 配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'# Django Channels 配置
ASGI_APPLICATION = 'your_project_name.asgi.application'
CHANNEL_LAYERS = {'default': {'BACKEND': 'channels_redis.core.RedisChannelLayer','CONFIG': {'hosts': [('127.0.0.1', 6379)],},},
}
3.1.2 創建 celery.py
在項目根目錄下創建 celery.py
文件,用于初始化 Celery:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery# 設置 Django 默認的 settings 模塊
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project_name.settings')app = Celery('your_project_name')# 從 Django 的 settings.py 加載配置
app.config_from_object('django.conf:settings', namespace='CELERY')# 自動發現任務模塊
app.autodiscover_tasks()
3.1.3 配置 __init__.py
在項目目錄下的 __init__.py
文件中添加以下內容:
from __future__ import absolute_import, unicode_literals# Celery 應用
from .celery import app as celery_app__all__ = ('celery_app',)
3.1.4 配置 asgi.py
在項目根目錄下的 asgi.py
文件中配置 WebSocket 支持:
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStackos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'your_project_name.settings')application = ProtocolTypeRouter({'http': get_asgi_application(),'websocket': AuthMiddlewareStack(URLRouter([# WebSocket 路由])),
})
3.2 數據模型設計
在 models.py
中定義兩張表:任務表 和 測試用例預覽表。
from django.db import models
import uuid# 測試用例預覽表
class TestCasePreview(models.Model):id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)field_name = models.CharField(max_length=255)field_type = models.CharField(max_length=255)field_value = models.TextField()generated_test_case = models.JSONField() # 存儲生成的 JSON 測試用例created_at = models.DateTimeField(auto_now_add=True)# 異步任務表
class AsyncTask(models.Model):TASK_STATUS = (('PENDING', 'Pending'),('PROCESSING', 'Processing'),('COMPLETED', 'Completed'),('FAILED', 'Failed'),)id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)name = models.CharField(max_length=255, unique=True)status = models.CharField(max_length=20, choices=TASK_STATUS, default='PENDING')created_at = models.DateTimeField(auto_now_add=True)updated_at = models.DateTimeField(auto_now=True)result_message = models.TextField(null=True, blank=True)
3.3 異步任務實現
在 tasks.py
中實現異步任務邏輯:
import requests
from celery import shared_task
from .models import AsyncTask, TestCasePreview
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync@shared_task
def generate_test_cases(task_id, field_data):try:# 獲取任務task = AsyncTask.objects.get(id=task_id)task.status = 'PROCESSING'task.save()# 調用阿里云百煉 DeepSeek-R1 APIclient = OpenAI(# 如果沒有配置環境變量,請用百煉API Key替換:api_key="sk-xxx"# api_key='sk-xxx',api_key='sk-712a634dbaa7444d838d20b25eb938xx', # todo 此處需更換base_url="https://dashscope.aliyuncs.com/compatible-mode/v1")reasoning_content = "" # 定義完整思考過程answer_content = "" # 定義完整回復is_answering = False # 判斷是否結束思考過程并開始回復# 創建聊天完成請求completion = client.chat.completions.create(model="deepseek-r1", # 此處以 deepseek-r1 為例,可按需更換模型名稱messages=[{'role': 'user','content': prompt_param}],stream=True,# 解除以下注釋會在最后一個chunk返回Token使用量# stream_options={# "include_usage": True# })print("\n" + "=" * 20 + "思考過程" + "=" * 20 + "\n")for chunk in completion:# 如果chunk.choices為空,則打印usageif not chunk.choices:print("\nUsage:")print(chunk.usage)else:delta = chunk.choices[0].delta# 打印思考過程if hasattr(delta, 'reasoning_content') and delta.reasoning_content != None:print(delta.reasoning_content, end='', flush=True)reasoning_content += delta.reasoning_contentelse:# 開始回復if delta.content != "" and not is_answering:print("\n" + "=" * 20 + "完整回復" + "=" * 20 + "\n")is_answering = True# 打印回復過程print(delta.content, end='', flush=True)answer_content += delta.content# 存儲測試用例到數據庫for field in field_data:TestCasePreview.objects.create(field_name=field["field_name"],field_type=field["field_type"],field_value=field["field_value"],generated_test_case=extract_json_objects(answer_content))# 更新任務狀態task.status = 'COMPLETED'task.result_message = "測試用例生成成功!"task.save()# 通過 WebSocket 通知用戶channel_layer = get_channel_layer()async_to_sync(channel_layer.group_send)(f"task_{task_id}",{"type": "task_status", "message": "任務已完成"})except Exception as e:task.status = 'FAILED'task.result_message = str(e)task.save()
3.4 用戶接口
在 views.py
中提供創建任務的接口:
from django.http import JsonResponse
from .models import AsyncTask
from .tasks import generate_test_casesdef create_task(request):if request.method == 'POST':task_name = request.POST.get('task_name')field_data = request.POST.get('field_data') # JSON 格式的字段列表# 檢查任務名稱是否唯一if AsyncTask.objects.filter(name=task_name).exists():return JsonResponse({"error": "任務名稱已存在"}, status=400)# 創建任務task = AsyncTask.objects.create(name=task_name)# 調用異步任務generate_test_cases.delay(task.id, field_data)return JsonResponse({"message": "任務已創建", "task_id": task.id})
3.5 WebSocket 通知
WebSocket 消費者 consumers.py
:
from channels.generic.websocket import AsyncWebsocketConsumer
import jsonclass TaskConsumer(AsyncWebsocketConsumer):async def connect(self):self.task_id = self.scope['url_route']['kwargs']['task_id']self.task_group_name = f"task_{self.task_id}"await self.channel_layer.group_add(self.task_group_name,self.channel_name)await self.accept()async def disconnect(self, close_code):await self.channel_layer.group_discard(self.task_group_name,self.channel_name)async def task_status(self, event):message = event['message']await self.send(text_data=json.dumps({"message": message}))
3.6 前端頁面展示
創建一個簡單的頁面用于展示測試用例:
<!DOCTYPE html>
<html lang="en">
<head><title>測試用例預覽</title>
</head>
<body><h1>測試用例預覽</h1><table id="test-cases"><thead><tr><th>字段名稱</th><th>字段類型</th><th>字段值</th><th>測試用例</th></tr></thead><tbody><!-- 數據通過 Ajax 加載 --></tbody></table>
</body>
</html>
四、總結
本項目從需求分析出發,采用 Django 框架結合 Celery、Redis 和 Django Channels,完整實現了一個基于異步任務的測試用例生成系統。系統支持任務狀態管理、用戶通知,以及前端數據的增刪改查功能,邏輯清晰,功能完善,非常適合在實際項目中推廣應用。在老舊測試平臺智能化中,異步是不可或缺的整改步驟,調試好異步任務后,后續優化將進入快速迭代階段。加油吧~