RAG開源項目Qanything源碼閱讀3-在線推理

原文:前沿重器[47] | RAG開源項目Qanything源碼閱讀3-在線推理
項目:https://github.com/netease-youdao/QAnything
第一篇:RAG開源項目Qanything源碼閱讀1-概述+服務
第二篇:RAG開源項目Qanything源碼閱讀2-離線文件處理


0,推理大概流程

  • 檢索&粗排
  • 精排
  • 檢索文檔后處理
  • prompt和請求大模型

1,外部服務

回顧一下在“前沿重器[45] RAG開源項目Qanything源碼閱讀1-概述+服務”中提到的服務核心文件,所有的接口都是在qanything_kernel\qanything_server\sanic_api.py里面啟動的:

app.add_route(document, "/api/docs", methods=['GET'])
app.add_route(new_knowledge_base, "/api/local_doc_qa/new_knowledge_base", methods=['POST'])  # tags=["新建知識庫"]
app.add_route(upload_weblink, "/api/local_doc_qa/upload_weblink", methods=['POST'])  # tags=["上傳網頁鏈接"]
app.add_route(upload_files, "/api/local_doc_qa/upload_files", methods=['POST'])  # tags=["上傳文件"] 
app.add_route(local_doc_chat, "/api/local_doc_qa/local_doc_chat", methods=['POST'])  # tags=["問答接口"] 
app.add_route(list_kbs, "/api/local_doc_qa/list_knowledge_base", methods=['POST'])  # tags=["知識庫列表"] 
app.add_route(list_docs, "/api/local_doc_qa/list_files", methods=['POST'])  # tags=["文件列表"]
app.add_route(get_total_status, "/api/local_doc_qa/get_total_status", methods=['POST'])  # tags=["獲取所有知識庫狀態"]
app.add_route(clean_files_by_status, "/api/local_doc_qa/clean_files_by_status", methods=['POST'])  # tags=["清理數據庫"]
app.add_route(delete_docs, "/api/local_doc_qa/delete_files", methods=['POST'])  # tags=["刪除文件"] 
app.add_route(delete_knowledge_base, "/api/local_doc_qa/delete_knowledge_base", methods=['POST'])  # tags=["刪除知識庫"] 
app.add_route(rename_knowledge_base, "/api/local_doc_qa/rename_knowledge_base", methods=['POST'])  # tags=["重命名知識庫"]

而推理,就是這里的local_doc_chat,直接看這個函數,就在qanything_kernel\qanything_server\handler.py里面。

async def local_doc_chat(req: request):local_doc_qa: LocalDocQA = req.app.ctx.local_doc_qauser_id = safe_get(req, 'user_id')if user_id is None:return sanic_json({"code": 2002, "msg": f'輸入非法!request.json:{req.json},請檢查!'})is_valid = validate_user_id(user_id)if not is_valid:return sanic_json({"code": 2005, "msg": get_invalid_user_id_msg(user_id=user_id)})debug_logger.info('local_doc_chat %s', user_id)kb_ids = safe_get(req, 'kb_ids')question = safe_get(req, 'question')rerank = safe_get(req, 'rerank', default=True)debug_logger.info('rerank %s', rerank)streaming = safe_get(req, 'streaming', False)history = safe_get(req, 'history', [])debug_logger.info("history: %s ", history)debug_logger.info("question: %s", question)debug_logger.info("kb_ids: %s", kb_ids)debug_logger.info("user_id: %s", user_id)not_exist_kb_ids = local_doc_qa.milvus_summary.check_kb_exist(user_id, kb_ids)if not_exist_kb_ids:return sanic_json({"code": 2003, "msg": "fail, knowledge Base {} not found".format(not_exist_kb_ids)})file_infos = []milvus_kb = local_doc_qa.match_milvus_kb(user_id, kb_ids)for kb_id in kb_ids:file_infos.extend(local_doc_qa.milvus_summary.get_files(user_id, kb_id))valid_files = [fi for fi in file_infos if fi[2] == 'green']if len(valid_files) == 0:return sanic_json({"code": 200, "msg": "當前知識庫為空,請上傳文件或等待文件解析完畢", "question": question,"response": "All knowledge bases {} are empty or haven't green file, please upload files".format(kb_ids), "history": history, "source_documents": [{}]})else:debug_logger.info("streaming: %s", streaming)if streaming:debug_logger.info("start generate answer")async def generate_answer(response):debug_logger.info("start generate...")for resp, next_history in local_doc_qa.get_knowledge_based_answer(query=question, milvus_kb=milvus_kb, chat_history=history, streaming=True, rerank=rerank):chunk_data = resp["result"]if not chunk_data:continuechunk_str = chunk_data[6:]if chunk_str.startswith("[DONE]"):source_documents = []for inum, doc in enumerate(resp["source_documents"]):source_info = {'file_id': doc.metadata['file_id'],'file_name': doc.metadata['file_name'],'content': doc.page_content,'retrieval_query': doc.metadata['retrieval_query'],'score': str(doc.metadata['score'])}source_documents.append(source_info)retrieval_documents = format_source_documents(resp["retrieval_documents"])source_documents = format_source_documents(resp["source_documents"])chat_data = {'user_info': user_id, 'kb_ids': kb_ids, 'query': question, 'history': history,'prompt': resp['prompt'], 'result': next_history[-1][1],'retrieval_documents': retrieval_documents, 'source_documents': source_documents}qa_logger.info("chat_data: %s", chat_data)debug_logger.info("response: %s", chat_data['result'])stream_res = {"code": 200,"msg": "success","question": question,# "response":next_history[-1][1],"response": "","history": next_history,"source_documents": source_documents,}else:chunk_js = json.loads(chunk_str)delta_answer = chunk_js["answer"]stream_res = {"code": 200,"msg": "success","question": "","response": delta_answer,"history": [],"source_documents": [],}await response.write(f"data: {json.dumps(stream_res, ensure_ascii=False)}\n\n")if chunk_str.startswith("[DONE]"):await response.eof()await asyncio.sleep(0.001)response_stream = ResponseStream(generate_answer, content_type='text/event-stream')return response_streamelse:for resp, history in local_doc_qa.get_knowledge_based_answer(query=question, milvus_kb=milvus_kb, chat_history=history, streaming=False, rerank=rerank):passretrieval_documents = format_source_documents(resp["retrieval_documents"])source_documents = format_source_documents(resp["source_documents"])chat_data = {'user_id': user_id, 'kb_ids': kb_ids, 'query': question, 'history': history,'retrieval_documents': retrieval_documents, 'prompt': resp['prompt'], 'result': resp['result'],'`': source_documents}qa_logger.info("chat_data: %s", chat_data)debug_logger.info("response: %s", chat_data['result'])return sanic_json({"code": 200, "msg": "success chat", "question": question, "response": resp["result"],"history": history, "source_documents": source_documents})

上面代碼的重點內容:

  • 首先因為是正式項目,在鑒權、數據庫檢測上都做了很多健壯性的處理,例如,對user_id的判別、對數據庫及其對應用戶的權限判別check_kb_exist,再者還有知識庫的判空等。
  • 此處有區分是否使用了流式streaming
  • 最終結果的輸出有進行結構化,結構化這事的業務代碼專門弄了個函數format_source_documents
  • 這里區分了retrieval_documentssource_documents,兩者有所區別,在后面展開聊關鍵算法流程的時候會展開講。
  • get_knowledge_based_answer是內部獲取知識點并進行生成的關鍵函數,就是上一條所說的關鍵算法流程。
# qanything_kernel\utils\general_utils.py def format_source_documents(ori_source_documents):source_documents = []for inum, doc in enumerate(ori_source_documents):# for inum, doc in enumerate(answer_source_documents):# doc_source = doc.metadata['source']file_id = doc.metadata['file_id']file_name = doc.metadata['file_name']# source_str = doc_source if isURL(doc_source) else os.path.split(doc_source)[-1]source_info = {'file_id': doc.metadata['file_id'],'file_name': doc.metadata['file_name'],'content': doc.page_content,'retrieval_query': doc.metadata['retrieval_query'],'kernel': doc.metadata['kernel'],'score': str(doc.metadata['score']),'embed_version': doc.metadata['embed_version']}source_documents.append(source_info)return source_documents

2,RAG推理流程

get_knowledge_based_answer的函數很簡單,不過單獨拿出來,對可讀性是有挺大幫助的。
RAG說白了就是先搜后交給大模型生成,終于講到這段代碼了,流程在這里qanything_kernel\core\local_doc_qa.py

   
# qanything_kernel\core\local_doc_qa.py
@get_timedef get_knowledge_based_answer(self, query, milvus_kb, chat_history=None, streaming: bool = STREAMING,rerank: bool = False):if chat_history is None:chat_history = []retrieval_queries = [query]source_documents = self.get_source_documents(retrieval_queries, milvus_kb)deduplicated_docs = self.deduplicate_documents(source_documents)retrieval_documents = sorted(deduplicated_docs, key=lambda x: x.metadata['score'], reverse=True)if rerank and len(retrieval_documents) > 1:debug_logger.info(f"use rerank, rerank docs num: {len(retrieval_documents)}")retrieval_documents = self.rerank_documents(query, retrieval_documents)source_documents = self.reprocess_source_documents(query=query,source_docs=retrieval_documents,history=chat_history,prompt_template=PROMPT_TEMPLATE)prompt = self.generate_prompt(query=query,source_docs=source_documents,prompt_template=PROMPT_TEMPLATE)t1 = time.time()for answer_result in self.llm.generatorAnswer(prompt=prompt,history=chat_history,streaming=streaming):resp = answer_result.llm_output["answer"]prompt = answer_result.prompthistory = answer_result.history# logging.info(f"[debug] get_knowledge_based_answer history = {history}")history[-1][0] = queryresponse = {"query": query,"prompt": prompt,"result": resp,"retrieval_documents": retrieval_documents,"source_documents": source_documents}yield response, historyt2 = time.time()debug_logger.info(f"LLM time: {t2 - t1}")

首先注意到這里有個裝飾器@get_time。可以用來記錄執行的時間。

def get_time(func):def inner(*arg, **kwargs):s_time = time.time()res = func(*arg, **kwargs)e_time = time.time()print('函數 {} 執行耗時: {} 秒'.format(func.__name__, e_time - s_time))return resreturn inner

2.1 檢索&粗排

get_source_documents是檢索的過程,即給定了retrieval_queriesmilvus_kb,即query所需要查的數據庫,開始進行查詢。這個的返回結果,會放在retrieval_documents里面,即**“檢索到的文檔”**,下面是源碼。

def get_source_documents(self, queries, milvus_kb, cosine_thresh=None, top_k=None):milvus_kb: MilvusClientif not top_k:top_k = self.top_ksource_documents = []embs = self.embeddings._get_len_safe_embeddings(queries)t1 = time.time()batch_result = milvus_kb.search_emb_async(embs=embs, top_k=top_k, queries=queries)t2 = time.time()debug_logger.info(f"milvus search time: {t2 - t1}")for query, query_docs in zip(queries, batch_result):for doc in query_docs:doc.metadata['retrieval_query'] = query  # 添加查詢到文檔的元數據中doc.metadata['embed_version'] = self.embeddings.embed_versionsource_documents.append(doc)if cosine_thresh:source_documents = [item for item in source_documents if float(item.metadata['score']) > cosine_thresh]return source_documents
  • _get_len_safe_embeddings給定query獲取向量。在上一期RAG開源項目Qanything源碼閱讀2-離線文件處理有講過,這個內部是請求一個向量模型的服務,背后的模型是需要和離線文件處理那個模型一致,所以部署同一個就會比較穩當,當然的,接口也是triton,一個grpc接口,有關GRPC,上次忘了放鏈接,這次放這里心法利器[6] | python grpc實踐,非常建議大家詳細了解并且學會。

  • search_emb_async是用于做向量檢索的。這個就是pymilvus的核心功能了。

  • 此處,查詢出來還要過一個閾值卡控,對相似度達不到閾值的文檔,需要過濾,閾值設置在cosine_thresh

_get_len_safe_embeddings 使用的embedding 代碼(可跳過,繼續回到 get_knowledge_based_answer
# qanything_kernel\connector\embedding\embedding_for_local.py
"""Wrapper around YouDao embedding models."""
from typing import Listfrom qanything_kernel.connector.embedding.embedding_client import EmbeddingClient
from qanything_kernel.configs.model_config import LOCAL_EMBED_SERVICE_URL, LOCAL_EMBED_MODEL_NAME, LOCAL_EMBED_MAX_LENGTH, LOCAL_EMBED_BATCH
from qanything_kernel.utils.custom_log import debug_logger
import concurrent.futures
from tqdm import tqdm embedding_client = EmbeddingClient(server_url=LOCAL_EMBED_SERVICE_URL,model_name=LOCAL_EMBED_MODEL_NAME,model_version='1',resp_wait_s=120,tokenizer_path='qanything_kernel/connector/embedding/embedding_model_0630')class YouDaoLocalEmbeddings:def __init__(self):passdef _get_embedding(self, queries):embeddings = embedding_client.get_embedding(queries, max_length=LOCAL_EMBED_MAX_LENGTH)return embeddingsdef _get_len_safe_embeddings(self, texts: List[str]) -> List[List[float]]:all_embeddings = []batch_size = LOCAL_EMBED_BATCHwith concurrent.futures.ThreadPoolExecutor() as executor:futures = []for i in range(0, len(texts), batch_size):batch = texts[i:i + batch_size]future = executor.submit(self._get_embedding, batch)futures.append(future)debug_logger.info(f'embedding number: {len(futures)}')for future in tqdm(futures):embeddings = future.result()all_embeddings += embeddingsreturn all_embeddings@propertydef embed_version(self):return embedding_client.getModelVersion()

回到 get_knowledge_based_answer
留意到 qanything_kernel\core\local_doc_qa.py 文件里的 get_knowledge_based_answer 里這一串代碼:

retrieval_documents = sorted(deduplicated_docs, key=lambda x: x.metadata['score'], reverse=True)
if rerank and len(retrieval_documents) > 1:debug_logger.info(f"use rerank, rerank docs num: {len(retrieval_documents)}")retrieval_documents = self.rerank_documents(query, retrieval_documents)
  • 此處注意,這里的檢索還涉及一個過程“粗排”(上面第一行代碼),這個粗排是指查詢數據庫的時候,需要根據相似度進行排序,只取TOPN,畢竟如果不進行這個TOP的卡控,那數據庫里所有的數據都會被查出來,這沒什么意義了。這里之所以叫粗排,是因為這種相似度的對比是比較粗略的,只能過濾掉“肯定不是”的那些無關結果。具體“哪個好”,用額外的、更精準的模型來做會更好,達到“優中取優”的目的。

2.2 檢索&粗排

繼續關注這里的 qanything_kernel\core\local_doc_qa.py 的 get_knowledge_based_answer里調用的 rerank_documents,這個就是精排,或者像這里說的重排。

def rerank_documents(self, query, source_documents):return self.rerank_documents_for_local(query, source_documents)def rerank_documents_for_local(self, query, source_documents):if len(query) > 300:  # tokens數量超過300時不使用local rerankreturn source_documentssource_documents_reranked = []try:response = requests.post(f"{self.local_rerank_service_url}/rerank",json={"passages": [doc.page_content for doc in source_documents], "query": query})scores = response.json()for idx, score in enumerate(scores):source_documents[idx].metadata['score'] = scoreif score < 0.35 and len(source_documents_reranked) > 0:continuesource_documents_reranked.append(source_documents[idx])source_documents_reranked = sorted(source_documents_reranked, key=lambda x: x.metadata['score'], reverse=True)except Exception as e:debug_logger.error("rerank error: %s", traceback.format_exc())debug_logger.warning("rerank error, use origin retrieval docs")source_documents_reranked = sorted(source_documents, key=lambda x: x.metadata['score'], reverse=True)return source_documents_reranked

簡單地,這里就是把所有召回回來的文章請求到重排服務來算分,根據算分來進行過濾和排序,篩選出最優的文章。和向量模型類似,一樣是用triton部署的,看模型名像是QAEnsemble_embed_rerank

2.3 檢索文檔后處理

更進一步,需要對文檔進行后處理,即reprocess_source_documents函數。qanything_kernel\core\local_doc_qa.py

#source_documents = self.reprocess_source_documents(query=query,
#                                                           source_docs=retrieval_documents,
#                                                           history=chat_history,
#                                                           prompt_template=PROMPT_TEMPLATE)def reprocess_source_documents(self, query: str,source_docs: List[Document],history: List[str],prompt_template: str) -> List[Document]:# 組裝prompt,根據max_tokenquery_token_num = self.llm.num_tokens_from_messages([query])history_token_num = self.llm.num_tokens_from_messages([x for sublist in history for x in sublist])template_token_num = self.llm.num_tokens_from_messages([prompt_template])# logging.info(f"<self.llm.token_window, self.llm.max_token, self.llm.offcut_token, query_token_num, history_token_num, template_token_num>, types = {type(self.llm.token_window), type(self.llm.max_token), type(self.llm.offcut_token), type(query_token_num), type(history_token_num), type(template_token_num)}, values = {query_token_num, history_token_num, template_token_num}")limited_token_nums = self.llm.token_window - self.llm.max_token - self.llm.offcut_token - query_token_num - history_token_num - template_token_numnew_source_docs = []total_token_num = 0for doc in source_docs:doc_token_num = self.llm.num_tokens_from_docs([doc])if total_token_num + doc_token_num <= limited_token_nums:new_source_docs.append(doc)total_token_num += doc_token_numelse:remaining_token_num = limited_token_nums - total_token_numdoc_content = doc.page_contentdoc_content_token_num = self.llm.num_tokens_from_messages([doc_content])while doc_content_token_num > remaining_token_num:# Truncate the doc content to fit the remaining tokensif len(doc_content) > 2 * self.llm.truncate_len:doc_content = doc_content[self.llm.truncate_len: -self.llm.truncate_len]else:  # 如果最后不夠truncate_len長度的2倍,說明不夠切了,直接賦值為空doc_content = ""breakdoc_content_token_num = self.llm.num_tokens_from_messages([doc_content])doc.page_content = doc_contentnew_source_docs.append(doc)breakdebug_logger.info(f"limited token nums: {limited_token_nums}")debug_logger.info(f"template token nums: {template_token_num}")debug_logger.info(f"query token nums: {query_token_num}")debug_logger.info(f"history token nums: {history_token_num}")debug_logger.info(f"new_source_docs token nums: {self.llm.num_tokens_from_docs(new_source_docs)}")return new_source_docs
  • 這里的llm,是一個自己封裝好的大模型工具,具體是在qanything_kernel\connector\llm\llm_for_fastchat.py這個位置。里面支持計算token請求大模型等通用功能。這個工具可以結合自己場景的需求搬過去直接使用。

  • 計算limited_token_nums主要是方便組裝prompt,避免某些文字被吃掉

  • 這里是需要對文檔進行新的拼接和調整,如果查詢的文檔太多太長,則需要截斷,且截斷的時候需要注意,要保證截斷的位置必須是完整地句子,如果不夠長直接不切了。

2.4 prompt和請求大模型

然后就是開始生成promptgenerate_prompt。說白了就是一個簡單的拼接。另外,這里的prompt拼接,更多使用replace來完成,之前有看過別的模式,例如用字符串的format應該也可以,不過replace的適用范圍會更廣一些。

def generate_prompt(self, query, source_docs, prompt_template):context = "\n".join([doc.page_content for doc in source_docs])prompt = prompt_template.replace("{question}", query).replace("{context}", context)return prompt

順帶就看看他們的prompt吧,實際上并不復雜。

PROMPT_TEMPLATE = """參考信息:
{context}
---
我的問題或指令:
{question}
---
請根據上述參考信息回答我的問題或回復我的指令。前面的參考信息可能有用,也可能沒用,你需要從我給出的參考信息中選出與我的問題最相關的那些,來為你的回答提供依據。回答一定要忠于原文,簡潔但不丟信息,不要胡亂編造。我的問題或指令是什么語種,你就用什么語種回復,
你的回復:"""

最后一步就是開始請求大模型了。即generatorAnswer函數。

def generatorAnswer(self, prompt: str,history: List[List[str]] = [],streaming: bool = False) -> AnswerResult:if history is None or len(history) == 0:history = [[]]logging.info(f"history_len: {self.history_len}")logging.info(f"prompt: {prompt}")logging.info(f"prompt tokens: {self.num_tokens_from_messages([{'content': prompt}])}")logging.info(f"streaming: {streaming}")response = self._call(prompt, history[:-1], streaming)complete_answer = ""for response_text in response:if response_text:chunk_str = response_text[6:]if not chunk_str.startswith("[DONE]"):chunk_js = json.loads(chunk_str)complete_answer += chunk_js["answer"]history[-1] = [prompt, complete_answer]answer_result = AnswerResult()answer_result.history = historyanswer_result.llm_output = {"answer": response_text}answer_result.prompt = promptyield answer_result

這里就是請求大模型的基本話術了,相對還是比較簡單的,一方面是請求大模型,另一方面是解析大模型內的結果。有留意到,這里有對內容做一些校驗:

if response_text:chunk_str = response_text[6:]if not chunk_str.startswith("[DONE]"):chunk_js = json.loads(chunk_str)complete_answer += chunk_js["answer"]

可以看出應該是有一些泛用性處理,能解決更多復雜的問題吧。

小結

本文把QAnything項目內的重要的推理部分穿講了一遍,可以看出這個項目已經非常完成,基本具備上線所需的關鍵部分,同時也有很嚴格的校驗邏輯,嚴格程度很高也比較穩定,經過這個學習,自己對工程代碼和具體實施的理解有了很大的提升,希望大家也有收獲。當然有空再復習一遍應該有更大收獲。

QAnything在服務的完整性、健壯性,以及文檔處理上都有了很多的更新,但都不要指望用上就能達到很高的水準,需要進一步提升還需要更多內里的修煉:

  • query理解輔助更好地提升檢索的準確性
  • 聯合訓練提升大模型和檢索結果的協同
  • 更深入定制的文檔處理提升內容的可讀性等

補充

qanything_kernel\connector\llm\llm_for_fastchat.py
from abc import ABC
import tiktoken
import os
from dotenv import load_dotenv
from openai import OpenAI
from typing import Optional, List
import sys
import json
import requests
import logging
sys.path.append("../../../")
from qanything_kernel.connector.llm.base import (BaseAnswer, AnswerResult)
from qanything_kernel.configs.model_config import LOCAL_LLM_SERVICE_URL, LOCAL_LLM_MODEL_NAME, LOCAL_LLM_MAX_LENGTHload_dotenv()logging.basicConfig(level=logging.INFO)class OpenAICustomLLM(BaseAnswer, ABC):model: str = LOCAL_LLM_MODEL_NAMEtoken_window: int = LOCAL_LLM_MAX_LENGTHmax_token: int = 512offcut_token: int = 50truncate_len: int = 50temperature: float = 0stop_words: str = Nonehistory: List[List[str]] = []history_len: int = 2def __init__(self):super().__init__()# self.client = OpenAI(base_url="http://localhost:7802/v1", api_key="EMPTY")if LOCAL_LLM_SERVICE_URL.startswith("http://"):base_url = f"{LOCAL_LLM_SERVICE_URL}/v1" else:base_url = f"http://{LOCAL_LLM_SERVICE_URL}/v1" self.client = OpenAI(base_url=base_url, api_key="EMPTY")@propertydef _llm_type(self) -> str:return "CustomLLM using FastChat w/ huggingface transformers or vllm backend"@propertydef _history_len(self) -> int:return self.history_lendef set_history_len(self, history_len: int = 10) -> None:self.history_len = history_lendef token_check(self, query: str) -> int:if LOCAL_LLM_SERVICE_URL.startswith("http://"):base_url = f"{LOCAL_LLM_SERVICE_URL}/api/v1/token_check" else:base_url = f"http://{LOCAL_LLM_SERVICE_URL}/api/v1/token_check" headers = {"Content-Type": "application/json"}response = requests.post(base_url, data=json.dumps({'prompts': [{'model': self.model, 'prompt': query, 'max_tokens': self.max_token}]}),headers=headers, timeout=60)# {'prompts': [{'fits': True, 'tokenCount': 317, 'contextLength': 8192}]}result = response.json()token_num = 0try:token_num = result['prompts'][0]['tokenCount']return token_numexcept Exception as e:logging.error(f"token_check Exception {base_url} w/ {e}")return token_numdef num_tokens_from_messages(self, message_texts):num_tokens = 0for message in message_texts:num_tokens += self.token_check(message)return num_tokensdef num_tokens_from_docs(self, docs):num_tokens = 0for doc in docs:num_tokens += self.token_check(doc.page_content)return num_tokensdef _call(self, prompt: str, history: List[List[str]], streaming: bool=False) -> str:messages = []for pair in history:question, answer = pairmessages.append({"role": "user", "content": question})messages.append({"role": "assistant", "content": answer})messages.append({"role": "user", "content": prompt})logging.info(messages)try:if streaming:response = self.client.chat.completions.create(model=self.model,messages=messages,stream=True,max_tokens=self.max_token,# temperature=self.temperature,stop=[self.stop_words] if self.stop_words is not None else None,)for event in response:if not isinstance(event, dict):event = event.model_dump()if event["choices"] is None:event_text = event["text"] + " error_code:" + str(event["error_code"])else:event_text = event["choices"][0]['delta']['content']if isinstance(event_text, str) and event_text != "":# logging.info(f"[debug] event_text = [{event_text}]")delta = {'answer': event_text}yield "data: " + json.dumps(delta, ensure_ascii=False)else:response = self.client.chat.completions.create(model=self.model,messages=messages,stream=False,max_tokens=self.max_token,# temperature=self.temperature,stop=[self.stop_words] if self.stop_words is not None else None,)# logging.info(f"[debug] response.choices = [{response.choices}]")event_text = response.choices[0].message.content if response.choices else ""delta = {'answer': event_text}yield "data: " + json.dumps(delta, ensure_ascii=False)except Exception as e:logging.info(f"Error calling API: {e}")delta = {'answer': f"{e}"}yield "data: " + json.dumps(delta, ensure_ascii=False)finally:# logging.info("[debug] try-finally")yield f"data: [DONE]\n\n"def generatorAnswer(self, prompt: str,history: List[List[str]] = [],streaming: bool = False) -> AnswerResult:if history is None or len(history) == 0:history = [[]]logging.info(f"history_len: {self.history_len}")logging.info(f"prompt: {prompt}")logging.info(f"prompt tokens: {self.num_tokens_from_messages([prompt])}")logging.info(f"streaming: {streaming}")response = self._call(prompt, history[:-1], streaming)complete_answer = ""for response_text in response:if response_text:chunk_str = response_text[6:]if not chunk_str.startswith("[DONE]"):chunk_js = json.loads(chunk_str)complete_answer += chunk_js["answer"]history[-1] = [prompt, complete_answer]answer_result = AnswerResult()answer_result.history = historyif streaming:answer_result.llm_output = {"answer": response_text}else:answer_result.llm_output = {"answer": complete_answer}answer_result.prompt = promptyield answer_resultif __name__ == "__main__":base_url = f"http://{LOCAL_LLM_SERVICE_URL}/api/v1/token_check" headers = {"Content-Type": "application/json"}query = "hello"response = requests.post(base_url, data=json.dumps({'prompts': [{'model': LOCAL_LLM_MODEL_NAME, 'prompt': query, 'max_tokens': 512}]}),headers=headers, timeout=60)# {'prompts': [{'fits': True, 'tokenCount': 317, 'contextLength': 8192}]}result = response.json()logging.info(f"[debug] result = {result}")llm = OpenAICustomLLM()streaming = Truechat_history = []prompt = "你是誰"prompt = """參考信息:
中央紀委國家監委網站訊 據山西省紀委監委消息:山西轉型綜合改革示范區黨工委副書記、管委會副主任董良涉嫌嚴重違紀違法,目前正接受山西省紀委監委紀律審查和監察調查。\\u3000\\u3000董良簡歷\\u3000\\u3000董良,男,漢族,1964年8月生,河南鹿邑人,在職研究生學歷,郵箱random@xxx.com,聯系電話131xxxxx909,1984年3月加入中國共產黨,1984年8月參加工作\\u3000\\u3000歷任太原經濟技術開發區管委會副主任、太原武宿綜合保稅區專職副主任,山西轉型綜合改革示范區黨工委委員、管委會副主任。2021年8月,任山西轉型綜合改革示范區黨工委副書記、管委會副主任。(山西省紀委監委)
---
我的問題或指令:
幫我提取上述人物的中文名,英文名,性別,國籍,現任職位,最高學歷,畢業院校,郵箱,電話
---
請根據上述參考信息回答我的問題或回復我的指令。前面的參考信息可能有用,也可能沒用,你需要從我給出的參考信息中選出與我的問題最相關的那些,來為你的回答提供依據。回答一定要忠于原文,簡潔但不丟信息,不要胡亂編造。我的問題或指令是什么語種,你就用什么語種回復,
你的回復:"""final_result = ""for answer_result in llm.generatorAnswer(prompt=prompt,history=chat_history,streaming=streaming):resp = answer_result.llm_output["answer"]if "DONE" not in resp:final_result += json.loads(resp[6:])["answer"]# logging.info(resp)logging.info(f"final_result = {final_result}")

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/diannao/39755.shtml
繁體地址,請注明出處:http://hk.pswp.cn/diannao/39755.shtml
英文地址,請注明出處:http://en.pswp.cn/diannao/39755.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

Pytest+Allure+Yaml+PyMsql+Jenkins+Gitlab接口自動化(五)Jenkins配置

一、背景 Jenkins&#xff08;本地宿主機搭建&#xff09; 拉取GitLab(服務器)代碼到在Jenkins工作空間本地運行并生成Allure測試報告 二、框架改動點 框架主運行程序需要先注釋掉運行代碼&#xff08;可不改&#xff0c;如果運行報allure找不到就直接注釋掉&#xff09; …

中英雙語介紹美國的州:路易斯安那州(Louisiana)

中文版 路易斯安那州簡介 路易斯安那州位于美國南部&#xff0c;以其豐富的歷史文化、多樣的自然景觀和獨特的音樂和美食傳統而聞名。以下是對路易斯安那州的詳細介紹&#xff0c;包括其地理位置、人口、經濟、教育、文化和主要城市。 地理位置 路易斯安那州東臨密西西比州…

鴻蒙應用開發-時間屏幕

點擊下載源碼&#xff1a; https://download.csdn.net/download/liuhaikang/89509449 做一個時間屏幕&#xff0c;可以點擊切換白色和黑色&#xff0c;有漸變效果&#xff0c;使用到了鴻蒙的動畫效果。 在這個設計中&#xff0c;我們首先引入了通用能力包&#xff0c;以實現功…

Kubernetes 離線安裝的坑我采了

Kubernetes 離線安裝的坑我采了 一、Error from server: Get "https://xx.xx.xx.xx:10250/containerLogs/kube-system/calico-node-8dnvs/calico-node": tls: failed to verify certificate: x509: certificate signed by unknown authority二、calico 或 pod 啟動正…

cesium公交車軌跡漫游

個人博客&#xff1a;CSDN 博客-滿分觀察網友 z 演示地址&#xff1a;嗶哩嗶哩-滿分觀察網友 z 這是一個用 Cesium.js 做的公交車軌跡漫游&#xff0c;實現的功能有加載站點和道路軌跡點數據、監聽車輛的實時位置、車輛控制器。滾動屏等等。 文章目錄 1. 地圖初始化2. 數據渲…

【高中數學/基本不等式】已知:x,y均為正實數,且xy+2x+y=4 求:x+y的最小值?

【問題】 已知&#xff1a;x,y均為正實數&#xff0c;且xy2xy4 求&#xff1a;xy的最小值&#xff1f; 【來源】 https://www.ixigua.com/7147585275823292942?logTagf25494de7fce23a3a3d0 【解答】 解&#xff1a; 由xy2xy4 兩邊加二得 xy2xy24 2 分解因式得 (x1)(…

0090__【Git系列】merge和rebase的區別

【Git系列】merge和rebase的區別_rebase和merge的區別-CSDN博客 git中rebase和merge的區別是什么-git-PHP中文網 https://blog.51cto.com/qzcsbj/9444199

從零搭建教育管理系統:Java + Vue.js 教學-02

第三步:創建實體類和 Mapper 接口 現在我們已經設計好了數據庫表,接下來使用 MyBatis-Plus 將這些表映射到 Java 對象,以便在代碼中輕松地進行操作。 1. 創建實體類 在 src/main/java/<your_package>/entity 目錄下 (如果沒有該目錄,請手動創建),創建與數據庫表對應…

MyBatis(20)MyBatis 事務管理如何實現

MyBatis 的事務管理是通過底層 JDBC 連接的事務管理機制來實現的。事務管理對于任何涉及多個數據庫操作的應用程序來說都是至關重要的&#xff0c;它確保數據的一致性和完整性。在 MyBatis 中&#xff0c;事務管理可以通過 SQL 會話&#xff08;SqlSession&#xff09;來實現。…

【WEB前端2024】3D智體編程:喬布斯3D紀念館-第53課-語音指令跳舞

【WEB前端2024】3D智體編程&#xff1a;喬布斯3D紀念館-第53課-語音指令跳舞 使用dtns.network德塔世界&#xff08;開源的智體世界引擎&#xff09;&#xff0c;策劃和設計《喬布斯超大型的開源3D紀念館》的系列教程。dtns.network是一款主要由JavaScript編寫的智體世界引擎&…

可信計算的完整專用名詞列表

可信計算的完整專用名詞列表 Trusted Computing - 可信計算Trusted Platform Module (TPM) - 可信平臺模塊Hardware Root of Trust - 硬件根信任Secure Boot - 安全啟動Remote Attestation - 遠程證明Integrity Measurement - 完整性度量Measurement Log - 度量日志Attestatio…

Android 圖像效果的奧秘

在當今數字化時代&#xff0c;圖像已經成為人們生活和工作中不可或缺的一部分。而在 Android 系統中&#xff0c;圖像效果的應用更是豐富多彩&#xff0c;為用戶帶來了更加出色的視覺體驗。本文將深入探討 Android 圖像效果的原理、實現方法以及應用場景&#xff0c;幫助讀者更…

面試題springboot面試

文章目錄 Spring的依賴注入構造器注入stetter注入屬性注入 springboot的優勢第一開箱即用約定大于配置內嵌tomcat服務器 javaweb的三大組件springboot的自動配置原理SpringIoc的實現機制springmvcspring如何簡化開發 Spring的依賴注入 構造器注入 stetter注入 屬性注入 使用…

按位異或^

在 Python 中&#xff0c;a ^ b 表示按位異或運算符。按位異或運算符對整數的每一位進行運算&#xff0c;如果對應位上的兩個二進制數字不同&#xff0c;則結果為 1&#xff0c;否則為 0。 示例 a 5 # 二進制: 0101 b 3 # 二進制: 0011result a ^ b print(result) # 輸…

私域流量:塑造企業數字營銷的未來

在當今數字化的時代&#xff0c;流量成為了商業世界中的新貨幣&#xff0c;而“私域流量”更是其中的黃金。但“私域流量”究竟是什么&#xff1f;它如何成為企業數字化轉型和營銷策略中不可或缺的一部分&#xff1f;本文將探討私域流量的概念&#xff0c;并通過案例分析其運營…

前端進階:Vue.js

目錄 框架&#xff1a; 助解&#xff1a; 框架&#xff1a; VUE 什么是Vue.js? Vue.js優點 Vue安裝 方式一&#xff1a;直接用<script>引入 方式二&#xff1a;命令行工具 第一個Vue程序 代碼 代碼解釋&#xff1a; 運行 Vue指令 v-text v-html v-tex…

Mysql和ES使用匯總

一、mysql和ES在業務上的配合使用 一般使用時使用ES 中存儲全文檢索的關鍵字與獲取的商品詳情的id&#xff0c;通過ES查詢獲取查詢商品的列表中展示的數據&#xff0c;通過展示id 操作去獲取展示商品的所有信息。mysql根據id去查詢數據庫數據是很快的&#xff1b; 為什么ES一般…

JavaScript如何聲明json對象

在JavaScript中&#xff0c;JSON&#xff08;JavaScript Object Notation&#xff09;對象實際上是以JavaScript對象的形式表示的。JSON是一種輕量級的數據交換格式&#xff0c;它基于ECMAScript&#xff08;歐洲計算機協會制定的js規范&#xff09;的一個子集&#xff0c;采用…

10 - Python文件編程和異常

文件和異常 在實際開發中&#xff0c;常常需要對程序中的數據進行持久化操作&#xff0c;而實現數據持久化最直接簡單的方式就是將數據保存到文件中。說到“文件”這個詞&#xff0c;可能需要先科普一下關于文件系統的知識&#xff0c;對于這個概念&#xff0c;維基百科上給出…

【CSharp】在class中申明public const int常量的應用

【CSharp】在class中申明public const int常量的應用 1.背景2.代碼3.運行1.背景 常量本身是靜態的,即常量屬于class本身與其實例化對象無關,且常量不可更改。 以上兩種特性在許多應用場景中都有其優勢和應用場合。 1.在定義定義固定值的場景,例如狀態碼、事件類型、配置參…