一、功能與安裝
1、模型地址
模型是阿里開發的門址高精度識別模型。
https://modelscope.cn/models/iic/mgeo_geographic_elements_tagging_chinese_base/summary
注意:不能自己安裝包,沒法解決依賴問題,直接按照官方要求安裝下面的包,大模型最好用conda
pip install "modelscope[nlp]" -f https://modelscope.oss-cn-beijing.aliyuncs.com/releases/repo.html+
2、功能介紹
按官網案例
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Taskstask = Tasks.token_classification
model = 'iic/mgeo_geographic_elements_tagging_chinese_base'
inputs = '浙江省杭州市余杭區阿里巴巴西溪園區'
pipeline_ins = pipeline(task=task, model=model)
print(pipeline_ins(input=inputs))
# 輸出
# {'output': [{'type': 'prov', 'start': 0, 'end': 3, 'span': '浙江省'}, {'type': 'city', 'start': 3, 'end': 6, 'span': '杭州市'}, {'type': 'district', 'start': 6, 'end': 9, 'span': '余杭區'}, {'type': 'poi', 'start': 9, 'end': 17, 'span': '阿里巴巴西溪園區'}]}
二、需要解決的問題
1、和表格數據結合
2、怎么加速,模型支持CPU和GPU,對于CPU使用需要做優化提升推理速度。
2.1、循環優化
2.2、緩存
2.3、數據類型 catory類
2.4、部分字段改為使用numpy
2.5、批量推理
2.6、模型量化(cpu無用)
2.7、模型自帶并行,無需做并行優化。
(一)、 逐條推理
這種方法準確率高,但是效率低
1、小數據量—使用pandas 從df逐行讀入,再寫入excel
易于理解
import pandas as pd
import warnings
import os
import time
# 忽略所有警告
warnings.filterwarnings("ignore")# 關閉 ModelScope 的詳細日志輸出
os.environ["MODELSCOPE_LOG_LEVEL"] = "40"# 導入 ModelScope 模塊
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks# 預加載模型(全局變量),避免每次調用函數都重新加載
TASK = Tasks.token_classification
MODEL = 'iic/mgeo_geographic_elements_tagging_chinese_base'
PIPELINE = pipeline(task=TASK, model=MODEL)def process_address_data(texts):"""批量處理地址文本,使用預加載的 MGeo 模型進行地理元素識別。參數:texts (list): 字符串列表,表示待處理的地址文本返回:list: 每個樣本的預測結果(格式為標簽+span)"""results = []for text in texts:# 確保輸入為字符串if not isinstance(text, str):text = str(text)# 使用已加載的模型進行推理try:result = PIPELINE(input=text)results.append(result['output'])except Exception as e:results.append([]) # 出錯則返回空結果return resultsdef extract_geo_entities(results):"""從 ModelScope 的輸出中提取出指定類型的地理實體,并分列返回。參數:results (list): 模型輸出的 list,每個元素是一個 dict 列表返回:dict: 包含各類地理實體拼接后的字符串"""entity_dict = {'district': [], # 區'town': [], # 鎮/街道'community': [], # 社區'poi': [], # 小區'houseno': [] # 樓號等}for item_list in results:for key in entity_dict:entities = [item['span'] for item in item_list if item.get('type') == key]entity_dict[key].append(";".join(entities) if entities else "")return entity_dictdef process_excel_file(file_path, input_col, output_file=None):"""讀取 Excel 文件,批量處理某列中的地址文本,并將提取的地理信息寫入新列。參數:file_path (str): Excel 文件路徑input_col (str): 待處理的列名output_file (str): 輸出文件路徑(默認覆蓋原文件)返回:pd.DataFrame: 包含新增字段的 DataFrame"""# 讀取Exceldf = pd.read_excel(file_path)# 確保輸入列為字符串類型df[input_col] = df[input_col].astype(str)# 獲取文本列表texts = df[input_col].tolist()# 處理文本并獲取結果results = process_address_data(texts)# 提取各類地理實體geo_entities = extract_geo_entities(results)# 添加新列到 dataframefor key in geo_entities:df[key] = geo_entities[key]# 寫回到 Excelif output_file is None:output_file = file_path # 默認覆蓋原文件df.to_excel(output_file, index=False)return df# 示例調用
if __name__ == "__main__":file_path = r'C:\Users\xueshifeng\Desktop\模板 - 副本1.xlsx' # 替換為你的實際路徑input_col = '工單內容' # 替換為你要處理的列名output_file = r'C:\Users\xueshifeng\Desktop\處理結果.xlsx't1=time.time()df_result = process_excel_file(file_path, input_col, output_file)print(time.time()-t1)
2、大數據量—使用numpy優化
通過numpy的向量化減少循環和內容預分配來提升速度
#ds
import pandas as pd
import numpy as np
import os
import time
import warnings
from functools import lru_cache
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks# 環境配置保持不變
warnings.filterwarnings("ignore")
os.environ["MODELSCOPE_LOG_LEVEL"] = "40"# 預定義常量
GEO_ENTITY_TYPES = ('district', 'town', 'community', 'poi', 'houseno')
VALID_ENTITY_TYPES = frozenset(GEO_ENTITY_TYPES)
MODEL_NAME = 'iic/mgeo_geographic_elements_tagging_chinese_base'# 模型加載保持不變
pipeline_instance = pipeline(task=Tasks.token_classification,model=MODEL_NAME
)@lru_cache(maxsize=128)
def cached_inference(text: str) -> list:"""優化后的緩存推理函數"""try:return pipeline_instance(input=text)['output'] or []except Exception:return []def batch_processing(texts: np.ndarray) -> list:texts_flat = texts.ravel()n = texts_flat.sizeresults = [None] * nfor i in range(n):text = str(texts_flat[i])results[i] = cached_inference(text)return resultsdef optimized_entity_extraction(results: list) -> dict:n_samples = len(results)entity_store = {etype: [[] for _ in range(n_samples)] for etype in GEO_ENTITY_TYPES}for idx, entities in enumerate(results):if isinstance(entities, list):for ent in entities:if (etype := ent.get('type')) in VALID_ENTITY_TYPES:entity_store[etype][idx].append(ent.get('span', ''))return {etype: pd.Series([';'.join(lst) if lst else '' for lst in entity_store[etype]],dtype='category')for etype in GEO_ENTITY_TYPES}def process_excel_data(file_path: str, input_col: str, output_file: str = None) -> pd.DataFrame:# 讀取數據df = pd.read_excel(file_path, engine='openpyxl')# 強制轉換輸入列為字符串df[input_col] = df[input_col].astype(str)# 核心處理流程input_series = df[input_col].values.astype(np.str_, copy=False)raw_results = batch_processing(input_series)geo_data = optimized_entity_extraction(raw_results)# 合并結果for col_name, series in geo_data.items():df[col_name] = series# 輸出結果output_path = output_file or file_pathdf.to_excel(output_path, index=False, engine='openpyxl')return df# 示例調用保持不變
if __name__ == "__main__":input_file = r"D:\data\gn\結果v68.xlsx"output_file = r"D:\data\gn\結果v68_optimized.xlsx"t_start = time.perf_counter()result_df = process_excel_data(input_file, '工單內容', output_file)elapsed = time.perf_counter() - t_startprint(f"處理完成,總耗時: {elapsed:.2f}秒")
(二)、提升運行速度
運行速度有2個瓶頸,一是模型推理速度二是大量數據操作。
模型推理速度增強,減少模型調用次數:減少模型由于模型能夠對128個字以內的數據進行高精度的處理,因此我們把多行在一起推理。這里的難點是如何區分處理后的結果是excel中的哪一行。
這里我們按照輸出中的’start’: 0, ‘end’: 3,即在輸入文本中的位置來確定是原數據的哪一行,即通過輸出詞在輸入文本的位置推導出輸出詞在原文本的位置。
# 輸出
# {'output': [{'type': 'prov', 'start': 0, 'end': 3, 'span': '浙江省'}, {'type': 'city', 'start': 3, 'end': 6, 'span': '杭州市'}, {'type': 'district', 'start': 6, 'end': 9, 'span': '余杭區'}, {'type': 'poi', 'start': 9, 'end': 17, 'span': '阿里巴巴西溪園區'}]}
我們通過下面的實體分拆函數來實現
def split_entities_by_line(result: List[Dict[str, Any]], original_lines: List[str],actual_batch_size: int = BATCH_SIZE
) -> List[List[Dict[str, Any]]]:"""實體分拆函數(修正偏移計算)"""if not result: # 空結果處理return [[] for _ in range(actual_batch_size)]# 預計算行偏移量(含分隔符)line_lengths = [len(line) for line in original_lines] # 每行長度sep_len = len(SEPARATOR) # 分隔符長度# 計算每行的起始偏移量(向量化加速)line_offsets = [0]for i in range(len(original_lines)-1):line_offsets.append(line_offsets[-1] + line_lengths[i] + sep_len) # 累加偏移# 實體分配邏輯(添加調試輸出)split_result = [[] for _ in range(actual_batch_size)]for entity in result:if not isinstance(entity, dict):continuestart_pos = entity.get('start', 0)# 使用二分查找加速實體分配(比循環快10倍+)line_idx = np.searchsorted(line_offsets, start_pos, side='right') - 1if 0 <= line_idx < actual_batch_size:# 計算相對位置adjusted_entity = {'type': entity.get('type', ''),'span': entity.get('span', ''),'start': start_pos - line_offsets[line_idx], # 調整為行內相對位置'end': entity.get('end', 0) - line_offsets[line_idx]}split_result[line_idx].append(adjusted_entity)# 調試輸出print(f"分配實體: {entity['span']} -> 第{line_idx}行")return split_result
2、大量數據操作
主要是使用以下方法
2.1、使用numpy代替pandas 并采用向量化的手段代替在pandas中遍歷。
2.2、使用二分查找而不是遍歷
3、預加載模型并去除告警和日志,特別是預加載模型,而不是在函數中每次調用加載可以大幅度縮短時間
4、加緩存,使得重復文本不需要重復推理。
最終代碼如下
import numpy as np
import pandas as pd
import os
import time
from functools import lru_cache
from typing import List, Dict, Any, Optional
from collections import defaultdict
import warnings# 配置環境
os.environ["MODELSCOPE_LOG_LEVEL"] = "40"
warnings.filterwarnings("ignore")# 模型初始化
from modelscope.pipelines import pipeline
from modelscope.utils.constant import TasksTASK = Tasks.token_classification
MODEL = 'iic/mgeo_geographic_elements_tagging_chinese_base'
PIPELINE = pipeline(task=TASK, model=MODEL)# 常量定義
GEO_ENTITY_TYPES = ['district', 'town', 'community', 'poi', 'houseno']
BATCH_SIZE = 3 # 初始小批量測試,驗證通過后調大
CACHE_SIZE = 128
SEPARATOR = '\ue000' # Unicode私有區字符(U+E000)@lru_cache(maxsize=CACHE_SIZE)
def cached_pipeline(text: str) -> List[Dict[str, Any]]:"""帶緩存的模型推理函數(修正輸出結構)"""if not text.strip():return []try:raw_output = PIPELINE(input=text)# 深度解析模型輸出結構if isinstance(raw_output, dict):return raw_output.get('output', [])elif isinstance(raw_output, list):return raw_outputreturn []except Exception as e:print(f"模型推理失敗: {str(e)}")return []def split_entities_by_line(result: List[Dict[str, Any]], original_lines: List[str],actual_batch_size: int = BATCH_SIZE
) -> List[List[Dict[str, Any]]]:"""實體分拆函數(修正偏移計算)"""if not result:return [[] for _ in range(actual_batch_size)]# 預計算行偏移量(含分隔符)line_lengths = [len(line) for line in original_lines]sep_len = len(SEPARATOR)line_offsets = [0]for i in range(len(original_lines)-1):line_offsets.append(line_offsets[-1] + line_lengths[i] + sep_len)# 實體分配邏輯(添加調試輸出)split_result = [[] for _ in range(actual_batch_size)]for entity in result:if not isinstance(entity, dict):continuestart_pos = entity.get('start', 0)# 查找所屬行line_idx = np.searchsorted(line_offsets, start_pos, side='right') - 1if 0 <= line_idx < actual_batch_size:# 計算相對位置adjusted_entity = {'type': entity.get('type', ''),'span': entity.get('span', ''),'start': start_pos - line_offsets[line_idx],'end': entity.get('end', 0) - line_offsets[line_idx]}split_result[line_idx].append(adjusted_entity)# 調試輸出return split_resultdef process_entities(all_results: List[List[Dict]], entity_types: List[str]
) -> Dict[str, List[str]]:"""實體處理函數"""entity_records = {k: defaultdict(set) for k in entity_types}for row_idx, row in enumerate(all_results):for entity in row:if isinstance(entity, dict) and (etype := entity.get('type')) in entity_types:entity_records[etype][row_idx].add(entity.get('span', ''))geo_data = {k: [] for k in entity_types}for row_idx in range(len(all_results)):for etype in entity_types:values = entity_records[etype].get(row_idx, set())geo_data[etype].append(';'.join(values) if values else '') return geo_datadef process_excel_file(file_path: str,input_col: str,output_file: Optional[str] = None
) -> pd.DataFrame:"""主處理流程"""try:df = pd.read_excel(file_path, engine='openpyxl', dtype={input_col: str})raw_texts = df[input_col].to_numpy()total_rows = len(raw_texts)except Exception as e:raise ValueError(f"Excel讀取失敗: {e}")all_results = []batch_indices = np.arange(0, total_rows, BATCH_SIZE)for start_idx in batch_indices:end_idx = min(start_idx + BATCH_SIZE, total_rows)batch_texts = raw_texts[start_idx:end_idx]actual_batch_size = len(batch_texts)merged_text = SEPARATOR.join(batch_texts.tolist())try:model_output = cached_pipeline(merged_text)split_result = split_entities_by_line(model_output, batch_texts.tolist(), actual_batch_size)all_results.extend(split_result)except Exception as e:print(f"處理行{start_idx}-{end_idx}時出錯: {e}")all_results.extend([[] for _ in range(actual_batch_size)])all_results = all_results[:total_rows]geo_data = process_entities(all_results, GEO_ENTITY_TYPES)for col in GEO_ENTITY_TYPES:df[col] = geo_data[col]try:output_path = output_file or file_pathdf.to_excel(output_path, index=False, engine='openpyxl')return dfexcept Exception as e:raise IOError(f"結果保存失敗: {e}")# 其余保持if __name__ == "__main__":file_path = r"D:\data\gn\結果v68.xlsx"output_file = r"D:\data\gn\結果v68_1.xlsx"try:t_start = time.time()result_df = process_excel_file(file_path, "工單內容", output_file)print(f"處理完成,總耗時: {time.time()-t_start:.2f}秒")except Exception as e:print(f"程序運行失敗: {str(e)}")
三、其他
1、增加batch size,能多行推理但是不用numpy 做向量化優化的版本,更易于理解代碼
import pandas as pd
import os
import time
from functools import lru_cache
from typing import List, Dict, Any, Optional
import warnings# 配置環境
# 忽略所有警告
warnings.filterwarnings("ignore")# 關閉 ModelScope 的詳細日志輸出
os.environ["MODELSCOPE_LOG_LEVEL"] = "40"# 模型初始化
from modelscope.pipelines import pipeline
from modelscope.utils.constant import Tasks
TASK = Tasks.token_classification
MODEL = 'iic/mgeo_geographic_elements_tagging_chinese_base'
PIPELINE = pipeline(task=TASK, model=MODEL)# 常量定義
GEO_ENTITY_TYPES = ['district', 'town', 'community', 'poi', 'houseno']
BATCH_SIZE = 3 # 每次合并的行數
CACHE_SIZE = 128 # 固定緩存大小@lru_cache(maxsize=CACHE_SIZE)
def cached_pipeline(text: str) -> List[Dict[str, Any]]:"""帶緩存的模型推理函數(固定128緩存大小)"""if not text.strip():return []try:return PIPELINE(input=text).get('output', [])except Exception as e:print(f"模型推理失敗: {str(e)}")return []def split_results_by_prov(result: List[Dict[str, Any]], original_lines: List[str]
) -> List[List[Dict[str, Any]]]:"""根據原始行位置拆分模型結果Args:result: 模型輸出結果original_lines: 原始的三行文本列表"""# 輸入校驗和自動填充if not original_lines:return [[] for _ in range(BATCH_SIZE)]# 確保總有3行數據(不足時填充空字符串)padded_lines = original_lines + [''] * (BATCH_SIZE - len(original_lines))# 計算字符范圍(考慮空格分隔)line_ranges = []current_pos = 0for line in padded_lines:line_end = current_pos + len(line)line_ranges.append((current_pos, line_end))current_pos = line_end + 1 # 空格占位# 分配實體到對應行split_result = [[] for _ in range(BATCH_SIZE)]for entity in result:if not isinstance(entity, dict) or 'start' not in entity:continue# 查找實體所屬行for line_idx, (start, end) in enumerate(line_ranges):if start <= entity['start'] < end:split_result[line_idx].append(entity)breakelse:# 未匹配的實體默認放入第一行split_result[0].append(entity)return split_resultdef process_excel_file(file_path: str,input_col: str,output_file: Optional[str] = None
) -> pd.DataFrame:"""主處理函數(單進程版)Args:file_path: 輸入Excel路徑input_col: 要處理的列名output_file: 輸出路徑(None則覆蓋輸入文件)"""# 讀取數據try:df = pd.read_excel(file_path, engine='openpyxl')df[input_col] = df[input_col].astype(str)except Exception as e:raise ValueError(f"Excel讀取失敗: {e}")# 準備批處理數據all_results = []for i in range(0, len(df), BATCH_SIZE):# 獲取原始三行文本original_lines = df[input_col].iloc[i:i+BATCH_SIZE].tolist()merged_text = ' '.join(original_lines)try:# 單次模型推理model_output = cached_pipeline(merged_text)# 拆分結果到原始行split_result = split_results_by_prov(model_output, original_lines)all_results.extend(split_result)except Exception as e:print(f"處理行{i}-{i+BATCH_SIZE}時出錯: {e}")all_results.extend([[] for _ in range(BATCH_SIZE)])# 結果對齊(確保與原始行數一致)all_results = all_results[:len(df)]# 提取地理實體geo_data = {k: [] for k in GEO_ENTITY_TYPES}for row in all_results:row_entities = {k: set() for k in GEO_ENTITY_TYPES}for entity in row:if isinstance(entity, dict) and entity.get('type') in row_entities:row_entities[entity['type']].add(entity.get('span', ''))for k in GEO_ENTITY_TYPES:geo_data[k].append(';'.join(filter(None, row_entities[k])))# 添加結果列for col in GEO_ENTITY_TYPES:df[col] = geo_data[col]# 輸出結果try:output_path = output_file if output_file else file_pathdf.to_excel(output_path, index=False, engine='openpyxl')return dfexcept Exception as e:raise IOError(f"結果寫入失敗: {e}")if __name__ == "__main__":path= r"D:\data\gn\結果v68.xlsx"output_file = r"D:\data\gn\結果v68_1.xlsx"try:t_start = time.time()result_df = process_excel_file(file_path=path,input_col="工單內容",output_file=output_file)print(f"處理完成,耗時: {time.time()-t_start:.2f}秒")except Exception as e:print(f"程序運行失敗: {str(e)}")
2、模型量化
from modelscope.models import Model
import torch
import os# 1. 定義本地模型路徑(需要包含 config.json 和 pytorch_model.bin 等文件)
local_model_path = r"D:\data\2025\Data\geo\iic\mgeo_geographic_elements_tagging_chinese_base"# 2. 確保路徑存在且包含模型文件
if not os.path.exists(local_model_path):raise FileNotFoundError(f"模型路徑不存在: {local_model_path}")# 3. 從本地加載模型
model = Model.from_pretrained(local_model_path, local_files_only=True)
print("模型加載成功!")# 4. 定義目標保存路徑
save_path = r'D:\data\2025\Data\geoq\iic'# 5. 創建目標路徑(如果不存在)
os.makedirs(save_path, exist_ok=True)# 6. 保存模型權重(推薦方式:僅保存參數)
model_weight_path = os.path.join(save_path, 'pytorch_model.bin')
torch.save(model.state_dict(), model_weight_path)
print(f"模型參數已保存至: {model_weight_path}")# 7. 或者保存整個模型(包括架構和參數)
full_model_path = os.path.join(save_path, 'full_model.pt')
torch.save(model, full_model_path)
print(f"完整模型已保存至: {full_model_path}")
注意:
量化后的路徑自有2個文件,其余文件需要手動復制過去。
CPU量化不起作用原因:
CPU對INT8運算的支持不足。大多數傳統CPU(如Intel Skylake或更早架構)不支持INT8加速指令集(如AVX512-VNNI),導致INT8運算需通過軟件模擬,反而增加計算開銷。
動態量化未覆蓋關鍵計算層。動態量化通常僅對線性層(Linear) 進行量化,而非線性層(如Softmax、LayerNorm)仍需FP32計算。如果模型中非線性層占比高,整體加速效果有限。激活值動態量化引入額外開銷。
批處理規模(Batch Size)過小。批量數據無法發揮INT8并行加速優勢 增大批量(如batch_size≥8)。
框架優化程度 PyTorch動態量化可能未調用底層加速庫 使用ONNX Runtime/TensorRT部署,或升級PyTorch版本。
3、預處理
為了能夠在128個字符的限制內容,推理跟多行,可以先對原始數據用正則做處理縮短長度。