如需完整工程文件(含所有模板),可回復獲取詳細模板代碼。
面向人群:自動化測試工程師、企業中后臺開發人員、希望提升效率的 AI 業務從業者
核心收獲:掌握 Django 三表關系設計、Celery 異步任務實踐、基礎 Web 交互與前后端分離思路,源碼可直接落地,方便二次擴展
一、系統功能全覽
本系統以大模型自動化任務管理為核心,涵蓋了三大業務場景:
1. 大模型任務管理
- 場景:測試工程師或業務人員通過 Web 頁面向企業內已接入的大模型(如 GPT、阿里百川等)發起任務請求,系統后臺異步執行任務并實時反饋進度與結果。
- 功能:
- 任務列表:清晰展示任務名稱、創建時間、發起人、提問內容、所用大模型、進度
- 任務詳情:一鍵查看輸入與大模型返回詳情
- 一鍵新建任務:彈窗表單,選擇發起人/模型、填寫提示詞,任務異步下發
2. 大模型管理
- 場景:平臺支持多模型配置、維護,可隨時增刪編輯,靈活應對不同業務需求或供應商切換。
- 功能:
- 大模型列表:展示所有已接入大模型的關鍵信息
- 新建/編輯/刪除大模型:表單支持 API Key、Base URL、模型名、維護人,快速擴展和維護
3. 人員管理
- 場景:平臺中的“發起人”、“維護人”都從人員表下拉選取,方便團隊協作與權限追蹤。
- 功能:
- 列表、創建、編輯、刪除人員,數據基礎一目了然
二、系統架構與擴展性說明
- 異步任務:Celery+Redis 組合,任務下發即返回,處理高并發和長耗時 AI 推理毫無壓力。
- 表單校驗:所有關鍵字段必填,表單錯誤友好提示。
- 數據解耦:大模型、人員、任務三表設計,方便后續增加權限、標簽等擴展字段。
- 易于二次開發:
- 任務可輕松拓展為定時任務、批量任務、API 任務
- 大模型表可加“模型類型”、“狀態”等新字段
- Celery 任務內可真實調用大模型接口,支持異步回調、失敗重試等企業級需求
- 權限與安全:可直接結合 Django 用戶系統,擴展為企業級權限管理后臺
環境準備
-
安裝依賴
pip install django celery redis
-
確保本地 Redis 已啟動(Windows 下可用)
項目結構
bigmodel_proj/
├── bigmodel_proj/
│ ├── __init__.py
│ ├── celery.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── app/
│ ├── __init__.py
│ ├── admin.py
│ ├── apps.py
│ ├── models.py
│ ├── tasks.py
│ ├── views.py
│ ├── forms.py
│ ├── urls.py
│ └── templates/
│ └── app/
│ └── ... # 所有前端模板
└── manage.py
四、核心代碼實現
1. bigmodel_proj/settings.py
(核心配置)
import os
from pathlib import PathBASE_DIR = Path(__file__).resolve().parent.parentSECRET_KEY = 'your-secret-key'
DEBUG = True
ALLOWED_HOSTS = []INSTALLED_APPS = ['django.contrib.admin','django.contrib.auth','django.contrib.contenttypes','django.contrib.sessions','django.contrib.messages','django.contrib.staticfiles','app',
]MIDDLEWARE = ['django.middleware.security.SecurityMiddleware','django.contrib.sessions.middleware.SessionMiddleware','django.middleware.common.CommonMiddleware','django.middleware.csrf.CsrfViewMiddleware','django.contrib.auth.middleware.AuthenticationMiddleware','django.contrib.messages.middleware.MessageMiddleware','django.middleware.clickjacking.XFrameOptionsMiddleware',
]ROOT_URLCONF = 'bigmodel_proj.urls'TEMPLATES = [{'BACKEND': 'django.template.backends.django.DjangoTemplates','DIRS': [BASE_DIR / 'app' / 'templates'],'APP_DIRS': True,'OPTIONS': {'context_processors': ['django.template.context_processors.debug','django.template.context_processors.request','django.contrib.auth.context_processors.auth','django.contrib.messages.context_processors.messages',],},},
]WSGI_APPLICATION = 'bigmodel_proj.wsgi.application'DATABASES = {'default': {'ENGINE': 'django.db.backends.sqlite3','NAME': BASE_DIR / 'db.sqlite3',}
}LANGUAGE_CODE = 'zh-hans'
TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True
USE_L10N = True
USE_TZ = TrueSTATIC_URL = '/static/'# Celery 配置
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
2. bigmodel_proj/celery.py
(Celery 啟動入口)
import os
from celery import Celeryos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'bigmodel_proj.settings')
app = Celery('bigmodel_proj')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
bigmodel_proj/__init__.py
:
from .celery import app as celery_app
__all__ = ('celery_app',)
3. app/models.py
(三大核心數據模型)
from django.db import modelsclass Person(models.Model):name = models.CharField(max_length=50, verbose_name="姓名")created_at = models.DateTimeField(auto_now_add=True, verbose_name="創建時間")def __str__(self):return self.nameclass BigModel(models.Model):api_key = models.CharField(max_length=128, verbose_name="API Key")base_url = models.URLField(verbose_name="Base URL")model_name = models.CharField(max_length=100, verbose_name="模型名")maintainer = models.ForeignKey(Person, on_delete=models.SET_NULL, null=True, verbose_name="維護人")created_at = models.DateTimeField(auto_now_add=True, verbose_name="創建時間")updated_at = models.DateTimeField(auto_now=True, verbose_name="修改時間")def __str__(self):return self.model_nameclass BigModelTask(models.Model):STATUS_CHOICES = (('PENDING', '待處理'),('RUNNING', '進行中'),('SUCCESS', '成功'),('FAILURE', '失敗'),)name = models.CharField(max_length=100, verbose_name="任務名稱")creator = models.ForeignKey(Person, on_delete=models.SET_NULL, null=True, verbose_name="發起人")prompt = models.TextField(verbose_name="用戶提問信息")big_model = models.ForeignKey(BigModel, on_delete=models.SET_NULL, null=True, verbose_name="大模型")status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='PENDING', verbose_name="任務進度")created_at = models.DateTimeField(auto_now_add=True, verbose_name="創建時間")detail = models.TextField(blank=True, null=True, verbose_name="大模型請求詳情")def __str__(self):return self.name
4. app/tasks.py
(Celery 異步任務)
from celery import shared_task
import time
from .models import BigModelTask@shared_task(bind=True)
def execute_bigmodel_task(self, task_id):try:task = BigModelTask.objects.get(id=task_id)task.status = 'RUNNING'task.save(update_fields=['status'])# 模擬大模型請求耗時time.sleep(5)# 假裝請求大模型并返回結果result_detail = f"請求大模型[{task.big_model.model_name}]完成,提示詞:{task.prompt}"task.status = 'SUCCESS'task.detail = result_detailtask.save(update_fields=['status', 'detail'])except Exception as e:if task:task.status = 'FAILURE'task.detail = f"異常:{str(e)}"task.save(update_fields=['status', 'detail'])raise
5. app/forms.py
(前端表單)
from django import forms
from