需求:
1.根據用戶編寫的要報規則,去mysql庫里SysManage_Rule表獲取已經啟用的規則作為條件(例如[{“field”: “關鍵詞”, “logic”: “AND”, “value”: “阿爾法”, “operator”: “=”,, “assign_user”: “user222”}])條件即為:關鍵詞=阿爾法
2.根據此條件去lxdb的all_report表進行查詢,查詢邏輯是每10min獲取最新數據,滿足條件的對all_report表的report_handler字段打上分配人名以及yb_importanceid填上要報規則的id
要報規則頁面:
SysManage_Rule表
解決辦法
規則分配腳本(supervisor運行):
import argparse
import os
import django
import time
from datetime import datetime, timedelta
import sys
import json# 添加項目路徑,確保 Django 配置正確
sys.path.append('/home/rpadmin/web/yb-backend')# 初始化 Django 環境
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "yb.settings")
django.setup()# 導入自定義模塊和參數
from DBreport.functions.get_table_mapping import get_gather_table
from wrapper.Params import TOREPORT, GATHER_TABLE
from DBconn.DBfunctions.dynamicQueryBase import DynamicQueryBaseEngine
from SysManage.models import SysmanageRule# 字段名映射:中文字段名 -> 英文字段名
field_mapping = {"報文要素": "text_content","語種": "audio_languagename","關鍵詞": "keyword","文種": "text_lang","實體": "text_entity","來源手段": "hj_means"
}# 初始記錄時間,第一次運行查很早之前的數據
last_run_time = datetime.now() - timedelta(minutes=1600000)def replace_field_names(data, mapping):new_data = {}for user, rules in data.items():new_rules = []for rule in rules:new_rule = rule.copy()if new_rule.get('field') in mapping:new_rule['field'] = mapping[new_rule['field']]new_rules.append(new_rule)new_data[user] = new_rulesreturn new_datadef run_rule_dispatch():global last_run_time# 獲取數據庫連接和數據表對象db_conn = get_gather_table()gather_table = db_conn.load_table(GATHER_TABLE)print("[調試] all_report 字段列表:", [col.name for col in gather_table.columns])# 查詢所有啟用狀態的規則rules = SysmanageRule.objects.filter(rule_status=1)for rule in rules:rule_id = rule.idrule_name = rule.rule_namerule_content = rule.rule_content# 檢查規則內容格式是否為列表if isinstance(rule_content, list):conditions = rule_contentelse:print(f"[跳過] 規則《{rule_name}》內容應為列表格式")continue# 替換條件中的字段名conditions = replace_field_names({rule_name: conditions}, field_mapping)[rule_name]filters = []keyword_value = None # 保存關鍵詞值(如果存在)for cond in conditions:field = cond.get("field")operator = cond.get("operator")value = cond.get("value")if cond.get("assign_user"):assign_user = cond["assign_user"]if not field or not operator or value is None:continue# 如果字段為 keyword 且是等于操作,延遲處理if field == "keyword" and operator == "=":keyword_value = valueelse:filters.append((field, operator, value))# 增加時間過濾:只查上次運行之后新增的數據filters.append(('hj_createtime', '>=', last_run_time))# 關鍵詞特殊處理:模糊匹配 audio_asr 或 trans_sidebyside 的 original_content/ translat_content 字段if keyword_value is not None:# audio_asr 模糊匹配filters_audio = filters + [('audio_asr', 'like', f"%{keyword_value}%")]# 執行原始查詢(不帶關鍵詞)用于后續文本內容篩選raw_trans_records = db_conn.execute_dynamic_query(gather_table,filters,limit=5, # 僅預覽前5條,必要時可調整或移除ignore_fields=[])matched_trans_data = []for row in raw_trans_records['data']:try:trans_data = row.get('trans_sidebyside')if isinstance(trans_data, str):trans_data = json.loads(trans_data) # JSON 反序列化if isinstance(trans_data, dict):original = trans_data.get("original_content", "")translated = trans_data.get("translat_content", "")if keyword_value in original or keyword_value in translated:matched_trans_data.append(row)except Exception:continue# 執行 audio_asr 匹配查詢result1 = db_conn.execute_dynamic_query(gather_table, filters_audio, ignore_fields=[])# 合并兩類結果(根據 lxid 去重)combined_data = {row['lxid']: row for row in result1['data']}for row in matched_trans_data:combined_data.setdefault(row['lxid'], row)total_count = len(combined_data)print(f"[調試] 條件命中記錄數:{total_count}")if total_count:first_row = list(combined_data.values())[0]print("[調試] 命中示例記錄:", first_row)# 構造更新內容updates = {'report_handler': assign_user,'yb_importanceid': str(rule_id),'yb_importancename': rule_name,'assign_rule_type': TOREPORT}# 遍歷命中記錄逐條更新affected = 0for row in combined_data.values():row_filter = [('lxid', '=', row['lxid'])]try:count = DynamicQueryBaseEngine.update_record(db_conn, gather_table, updates, row_filter)affected += countexcept Exception as e:print(f"[×] 更新失敗:{str(e)}")print(f"[?] 規則《{rule_name}》執行成功,分配人:{assign_user},更新{affected}條")continue# 正常流程處理:無關鍵詞匹配邏輯print(f"[調試] 規則《{rule_name}》篩選條件:{filters}")result_data = db_conn.execute_dynamic_query(gather_table,filters,limit=5,ignore_fields=[])print(f"[調試] 條件命中記錄數:{result_data['total_count']}")if result_data['data']:print("[調試] 命中示例記錄:", result_data['data'][0])updates = {'report_handler': assign_user,'yb_importanceid': str(rule_id),'yb_importancename': rule_name,'assign_rule_type': TOREPORT}try:affected = DynamicQueryBaseEngine.update_record(db_conn, gather_table, updates, filters)print(f"[?] 規則《{rule_name}》執行成功,分配人:{assign_user},更新{affected}條")except Exception as e:print(f"[×] 規則《{rule_name}》執行失敗:{str(e)}")# 關閉連接,記錄當前時間用于下輪過濾db_conn.close()last_run_time = datetime.now()if __name__ == "__main__":while True:print(f"\n[調試] 開始執行調度任務,當前時間:{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")run_rule_dispatch()print(f"[調試] 任務執行完成,等待10分鐘...\n")time.sleep(600) # 每10分鐘運行一次
指定分配人接口(即向rule_content字段里添加assign_user):
@class_operation_logger(operation_name="/指定要報規則分配人")
class RuleAssignUserUpdateAPIView(APIView):"""接收 assign_user 和 規則id,指定規則的 rule_content 中每一項的 assign_userPOST /api/sys/update-assign-user/Body:{"id": 2,"assign_user": "user002"}"""def post(self, request, *args, **kwargs):rule_id = request.data.get("id")assign_user = request.data.get("assign_user")if not rule_id or not assign_user:return ErrorResponse(data=False, msg="參數id 和 assign_user 都是必填的")# 獲取要更新的規則rule = get_object_or_404(SysmanageRule, pk=rule_id)try:content_list = rule.rule_content or []if not isinstance(content_list, list):raise ValueError("rule_content 必須是列表")except Exception as e:return ErrorResponse(data=False, msg=f"讀取 rule_content 失敗:{str(e)}")# 創建或更新每一項的 assign_userfor cond in content_list:cond['assign_user'] = assign_user# 保存回庫rule.rule_content = content_listrule.save(update_fields=['rule_content'])return SuccessResponse(msg=f"id為 {rule_id} 的規則的分配人已更新為 {assign_user}")
Supervisor工具詳見文章Supervisor進程管理