目錄
摘要
Abstract
1 Schema
2 Prompt
3 KAG-Builder
3.1 reader
3.2?splitter
3.3?extractor
3.4?vectorizer
3.5 writer
3.6 可選組件
4 示例
總結
摘要
本周深入學習了 KAG 項目中的 Schema、Prompt 以及 KAG-Builder 相關代碼知識,涵蓋了其定義、功能以及在知識圖譜構建中的應用。Schema定義了知識圖譜的架構,Prompt用于引導模型生成符合預期的輸出,而 KAG-Builder則是通過結構化信息獲取、知識語義對齊和圖存儲寫入來構建知識圖譜,其中包含reader、splitter、extractor、vectorizer、writer等多個組件。
Abstract
This week, we have in-depth knowledge of the Schema, Prompt and KAG-Builder related code knowledge in the KAG project, covering their definitions, functions, and applications in the construction of knowledge graphs. The schema defines the architecture of the knowledge graph, the Prompt is used to guide the model to generate the expected output, and the KAG-Builder is to build the knowledge graph through structured information acquisition, knowledge semantic alignment, and graph storage writing, which includes multiple components such as reader, splitter, extractor, vectorizer, and writer.
1 Schema
Schema是定義知識圖譜語義框架和數據組織結構的核心組件,它規定了知識圖譜中數據的類型、關系及邏輯約束,是構建和管理知識圖譜的基礎,其中:
- 類型:定義一類物體的行為,類型與實例的關系可類比與java語言中類和對象的關系;包括實體類型(EntityType),概念類型(ConceptType),事件類型(EventType),標準類型等;
- 實例:類比于java語言中的對象,是在圖數據庫中的node此處要澄清,類型與實例只是SPG邏輯上的概念,在實際的圖數據庫存儲中,都為node;
- 屬性:實體中可定義屬性,這與java類中的屬性相似,注意,在SPG中某些屬性會被處理成關系節點,如User類型下有一個phone的標準類型屬性,在User實例的創建中,會生成一個phone標準類型的實例node,并創建一個由User實例指向phone實例的edge,這樣做可以很好的解決知識圖譜稀疏性的問題,也能提取共性屬性;
- 關系:類比于圖數據庫中的edge,可以有屬性,且可以增加多種限制;
- 規則:定義屬性,關系時可定義相關規則,在規則中創建約束條件,如相反關系等。
概念類型:
- 實體從具體到一般的抽象,表述的是一組實體實例或事件實例的集合,是一種分類體系;
- 相對靜態,也是常識知識,具有較強復用性,如人群標簽、事件分類、行政區劃分類等;
- 概念類型的關系只允許創建在7大類里定義的謂詞,概念類型需定義上下謂詞。
HospitalDepartment(科室): ConceptTypehypernymPredicate: isA
實體類型(必須存在):
- 業務相關性比較強的客觀對象,通多屬性、多關系刻畫的多元復合結構類型,如用戶、企業、商戶等;
- 所有類型會默認繼承Thing類型,默認創建id、name和description屬性。
Disease(疾病): EntityTypeproperties:desc(描述): Textindex: TextsemanticType(語義類型): Textindex: Textcomplication(并發癥): Diseaseconstraint: MultiValuecommonSymptom(常見癥狀): Symptomconstraint: MultiValueapplicableMedicine(適用藥品): Medicineconstraint: MultiValuehospitalDepartment(就診科室): HospitalDepartmentconstraint: MultiValuediseaseSite(發病部位): HumanBodyPartconstraint: MultiValuerelations:abnormal(異常指征): Indicator
事件類型:
- 加入時間、空間等約束的時空多元類型,如通過NLP、CV等抽取出來的行業事件、企業事件、診療事件或因購買、核銷、注冊等行為產生的用戶行為事件;
- 事件類型必須有主題subject,可以有發生事件、地點等基礎信息。
Event(醫療風險事件): EventTypeproperties:subject(主體): Doctortime(時間): Date
2 Prompt
Prompt是引導LLM與知識圖譜協同工作的核心指令,其設計融合了結構化知識約束和語義對齊能力,旨在提升垂直領域的決策精準性與邏輯嚴謹性。
在KAG中的Prompt主要要以下幾類:
- 實體prompt:
"instruction": "作為一個圖譜知識抽取的專家, 你需要基于定義了實體類型及對應屬性的schema,從input字段的文本中抽取出所有的實體及其屬性,schema中標記為List的屬性返回list,未能提取的屬性返回null。以標準json list格式輸出,list中每個元素形如{category: properties},你可以參考example字段中給出的示例格式。注意實體屬性的SemanticType指的是一個相比實體類型更具體且明確定義的類型,例如Person類型的SemanticType可以是Professor或Actor。","example": [{"input": "周杰倫(Jay Chou),1979年1月18日出生于臺灣省新北市,祖籍福建省永春縣,華語流行樂男歌手、音樂人、演員、導演、編劇,畢業于淡江中學。2000年,發行個人首張音樂專輯《Jay》 [26]。2023年憑借《最偉大的作品》獲得第一屆浪潮音樂大賞年度制作、最佳作曲、最佳音樂錄影帶三項大獎。","output": [{"category": "Person","properties": {"name": "周杰倫","semanticType": "Musician","description": "華語流行樂男歌手、音樂人、演員、導演、編劇",},},{"category": "GeographicLocation","properties": {"name": "臺灣省新北市","semanticType": "City","description": "周杰倫的出生地",},},{"category": "GeographicLocation","properties": {"name": "福建省永春縣","semanticType": "County","description": "周杰倫的祖籍",},},{"category": "Organization","properties": {"name": "淡江中學","semanticType": "School","description": "周杰倫的畢業學校",},},{"category": "Works","properties": {"name": "Jay","semanticType": "Album","description": "周杰倫的個人首張音樂專輯",},},{"category": "Works","properties": {"name": "最偉大的作品","semanticType": "MusicVideo","description": "周杰倫憑借此作品獲得多項音樂大獎",},},],}],
- 事件prompt:
"instruction": "作為一個知識圖譜圖譜事件抽取的專家, 你需要基于定義的事件類型及對應屬性的schema,從input字段的文本中抽取出所有的事件及其屬性,schema中標記為List的屬性返回list,未能提取的屬性返回null。以標準json list格式輸出,list中每個元素形如{category: properties},你可以參考example字段中給出的示例格式。","example": {"input": "1986年,周星馳被調入無線電視臺戲劇組;同年,他在單元情景劇《哥哥的女友》中飾演可愛活潑又略帶羞澀的潘家偉,這也是他第一次在情景劇中擔任男主角;之后,他還在溫兆倫、郭晉安等人主演的電視劇中跑龍套。","output": [{"category": "Event","properties": {"name": "周星馳被調入無線電視臺戲劇組","abstract": "1986年,周星馳被調入無線電視臺戲劇組。","subject": "周星馳","time": "1986年","location": "無線電視臺","participants": [],"semanticType": "調動",},},{"category": "Event","properties": {"name": "周星馳在《哥哥的女友》中飾演潘家偉","abstract": "1986年,周星馳在單元情景劇《哥哥的女友》中飾演可愛活潑又略帶羞澀的潘家偉,這也是他第一次在情景劇中擔任男主角。","subject": "周星馳","time": "1986年","location": None,"participants": [],"semanticType": "演出",},},{"category": "Event","properties": {"name": "周星馳跑龍套","abstract": "1986年,周星馳在溫兆倫、郭晉安等人主演的電視劇中跑龍套。","subject": "周星馳","time": "1986年","location": None,"participants": ["溫兆倫", "郭晉安"],"semanticType": "演出",},},],},
- 實體標準化prompt:
"""
{"instruction": "input字段包含用戶提供的上下文。命名實體字段包含從上下文中提取的命名實體,這些可能是含義不明的縮寫、別名或俚語。為了消除歧義,請嘗試根據上下文和您自己的知識提供這些實體的官方名稱。請注意,具有相同含義的實體只能有一個官方名稱。請按照提供的示例中的輸出字段格式,以單個JSONArray字符串形式回復,無需任何解釋。","example": {"input": "煩躁不安、語妄、失眠酌用鎮靜藥,禁用抑制呼吸的鎮靜藥。3.并發癥的處理經抗菌藥物治療后,高熱常在24小時內消退,或數日內逐漸下降。若體溫降而復升或3天后仍不降者,應考慮SP的肺外感染,如腋胸、心包炎或關節炎等。治療:接胸腔壓力調節管+吸引機負壓吸引水瓶裝置閉式負壓吸引宜連續,如經12小時后肺仍未復張,應查找原因。","named_entities": [{"name": "煩躁不安", "category": "Symptom"},{"name": "語妄", "category": "Symptom"},{"name": "失眠", "category": "Symptom"},{"name": "鎮靜藥", "category": "Medicine"},{"name": "肺外感染", "category": "Disease"},{"name": "胸腔壓力調節管", "category": "MedicalEquipment"},{"name": "吸引機負壓吸引水瓶裝置", "category": "MedicalEquipment"},{"name": "閉式負壓吸引", "category": "SurgicalOperation"}],"output": [{"name": "煩躁不安", "category": "Symptom", "official_name": "焦慮不安"},{"name": "語妄", "category": "Symptom", "official_name": "譫妄"},{"name": "失眠", "category": "Symptom", "official_name": "失眠癥"},{"name": "鎮靜藥", "category": "Medicine", "official_name": "鎮靜劑"},{"name": "肺外感染", "category": "Disease", "official_name": "肺外感染"},{"name": "胸腔壓力調節管", "category": "MedicalEquipment", "official_name": "胸腔引流管"},{"name": "吸引機負壓吸引水瓶裝置", "category": "MedicalEquipment", "official_name": "負壓吸引裝置"},{"name": "閉式負壓吸引", "category": "SurgicalOperation", "official_name": "閉式負壓引流"}]},"input": $input,"named_entities": $named_entities,
} """
KAG中的Prompt是連接大模型與知識圖譜的“語義橋梁”,通過結構化知識注入、符號邏輯引導和動態上下文適配,解決了傳統大模型在垂直領域中的幻覺、邏輯混亂等問題。
3 KAG-Builder
構建器鏈為一個有向無環圖,有三種類型的類,用于不同場景的知識圖譜構建:
- 結構化構建器鏈:處理結構化數據(如數據庫表、CSV文件),Mapping(映射器)->Vectorizer(向量器,可選)->Writer(寫入器)
- mapping:將原始數據按預定義Schema轉換為中間格式;
- vectorizer:將結構化數據編碼為向量;
- writer:將處理后的數據持久化到存儲系統。
@KAGBuilderChain.register("structured")
@KAGBuilderChain.register("structured_builder_chain")
class DefaultStructuredBuilderChain(KAGBuilderChain):def __init__(self,mapping: MappingABC,writer: SinkWriterABC,vectorizer: VectorizerABC = None,):self.mapping = mappingself.writer = writerself.vectorizer = vectorizerdef build(self, **kwargs):if self.vectorizer:chain = self.mapping >> self.vectorizer >> self.writerelse:chain = self.mapping >> self.writerreturn chain
如下圖所示,為KAG-Builder的處理非結構化數據的流程鏈:
- 非結構化構建器鏈:處理非結構化文本(如 PDF、TXT),Reader(讀取器)->Spliter(切分器)->Extractor(提取器)->Vectorizer(向量器)->PostProcess(后處理,可選)->Writer(寫入器):
- reader:加載原始文件,生成Chunk或SubGraph;
- spliter:將大Chunk分割為更小的chunk;
- extractor:從文本中抽取實體、關系、事件,生成SubGraph;
- vectorizer:將文本或圖譜編碼為向量;
- post_process:優化圖譜結構;
- writer:將構建好的知識圖譜數據寫入到指定的存儲介質中。
@KAGBuilderChain.register("unstructured")
@KAGBuilderChain.register("unstructured_builder_chain")
class DefaultUnstructuredBuilderChain(KAGBuilderChain):def __init__(self,reader: ReaderABC,splitter: SplitterABC = None,extractor: ExtractorABC = None,vectorizer: VectorizerABC = None,writer: SinkWriterABC = None,post_processor: PostProcessorABC = None,):self.reader = readerself.splitter = splitterself.extractor = extractorself.vectorizer = vectorizerself.post_processor = post_processorself.writer = writerdef build(self, **kwargs):chain = self.reader >> self.splitterif self.extractor:chain = chain >> self.extractorif self.vectorizer:chain = chain >> self.vectorizerif self.post_processor:chain = chain >> self.post_processorif self.writer:chain = chain >> self.writerreturn chain
-
外部知識注入鏈:將外部知識圖譜(如 Wikidata)合并到當前圖譜,ExternalGraphLoader(外部圖譜加載器)->Vectorizer(向量器,可選)->Writer(寫入器):
-
externalgraph:讀取外部圖譜數據;
-
vectorizer:編碼外部圖譜;
-
writer:將外部圖譜與當前圖譜融合。
-
@KAGBuilderChain.register("domain_kg_inject_chain")
class DomainKnowledgeInjectChain(KAGBuilderChain):def __init__(self,external_graph: ExternalGraphLoaderABC,writer: SinkWriterABC,vectorizer: VectorizerABC = None,):self.external_graph = external_graphself.writer = writerself.vectorizer = vectorizerdef build(self, **kwargs):if self.vectorizer:chain = self.external_graph >> self.vectorizer >> self.writerelse:chain = self.external_graph >> self.writerreturn chain
3.1 reader
reader基于抽象基類ReaderABC,用于將文件(pdf、txt、csv等類型)內容解析為Chunk對象,以下是Reader文件中TXTReader的代碼:
@ReaderABC.register("txt")
@ReaderABC.register("txt_reader")
class TXTReader(ReaderABC):def _invoke(self, input: Input, **kwargs) -> List[Output]:if not input:raise ValueError("Input cannot be empty")try:if os.path.exists(input):with open(input, "r", encoding="utf-8") as f:content = f.read()else:content = inputexcept OSError as e:raise IOError(f"Failed to read file: {input}") from ebasename, _ = os.path.splitext(os.path.basename(input))chunk = Chunk(id=generate_hash_id(input),name=basename,content=content,)return [chunk]
TXTReader為處理文本讀取的主要方法,此方法讀取輸入的內容(可以是文件路徑或文本內容)并將其轉換為塊,其中:
- 參數:
- input (輸入):輸入字符串,可以是文本文件或直接文本內容的路徑。
- ?**kwargs:其他關鍵字參數,當前未使用,但保留以備將來擴展。
- 返回:
- ?List[Output]:包含 Chunk 對象的列表,每個對象代表讀取的一段文本。
- 異常處理:
- ValueError:如果輸入為空。
- IOError:如果讀取輸入指定的文件時出現問題。
3.2?splitter
splitter能夠根據指定的長度和窗口大小將文本拆分為多個小塊。它支持按句子邊界拆分和嚴格按長度拆分兩種模式,并且能夠處理表格數據。
LengthSplitter類繼承自 BaseTableSplitter,來實現根據文本長度和窗口大小將文本拆分為小塊的功能。分為以下幾個步驟:
- __init__:構造函數,初始化拆分長度、窗口長度和是否嚴格按長度拆分:
- split_length (int):每個 chunk 的最大長度;
- window_length (int):塊之間的重疊長度;
- strict_length (bool):是否嚴格按長度拆分而不保留句子,默認為 False。
@SplitterABC.register("length")
@SplitterABC.register("length_splitter")
class LengthSplitter(BaseTableSplitter):def __init__(self,split_length: int = 500,window_length: int = 100,strict_length: bool = False,):super().__init__()self.split_length = split_lengthself.window_length = window_lengthself.strict_length = strict_length
- input_types?output_types:定義輸入和輸出類型均為Chunk:
@propertydef input_types(self) -> Type[Input]:return Chunk@propertydef output_types(self) -> Type[Output]:return Chunk
- chunk_breakdown:遞歸地將文本塊拆分為更小的塊:
def chunk_breakdown(self, chunk):chunks = self.logic_break(chunk)if chunks:res_chunks = []for c in chunks:res_chunks.extend(self.chunk_breakdown(c))else:res_chunks = self.slide_window_chunk(chunk, self.split_length, self.window_length)return res_chunks
- split_sentence:將文本按句子分隔符拆分為句子列表:
- 參數:content (str),需要拆分為句子的內容;
- 返回值:List[str],句子列表。
def split_sentence(self, content):sentence_delimiters = ".。??!!" if KAG_PROJECT_CONF.language == "en" else "。?!"output = []start = 0for idx, char in enumerate(content):if char in sentence_delimiters:end = idxtmp = content[start : end + 1].strip()if len(tmp) > 0:output.append(tmp.strip())start = idx + 1res = content[start:].strip()if len(res) > 0:output.append(res)return output
- slide_window_chunk:使用滑動窗口方法將內容拆分為多個塊:
- 參數:
- org_chunk (Chunk):要拆分的原始Chunk;
- chunk_size (int, optional):每個塊的最大大小,默認值為2000;
- ?window_length (int,可選):塊之間的重疊長度,默認值為300;
- ?sep (str,可選):用于連接句子的分隔符,默認為 “n”;
- 返回: List[Chunk],Chunk對象的列表。
- 參數:
def slide_window_chunk(self,org_chunk: Chunk,chunk_size: int = 2000,window_length: int = 300,sep: str = "\n",) -> List[Chunk]:if org_chunk.type == ChunkTypeEnum.Table:table_chunks = self.split_table(org_chunk=org_chunk, chunk_size=chunk_size, sep=sep)if table_chunks is not None:return table_chunks# 如果啟用嚴格長度切分,不按句子切分if getattr(self, "strict_length", False):return self.strict_length_chunk(org_chunk)content = self.split_sentence(org_chunk.content)splitted = []cur = []cur_len = 0for sentence in content:if cur_len + len(sentence) > chunk_size:if cur:splitted.append(cur)tmp = []cur_len = 0for item in cur[::-1]:if cur_len >= window_length:breaktmp.append(item)cur_len += len(item)cur = tmp[::-1]cur.append(sentence)cur_len += len(sentence)if len(cur) > 0:splitted.append(cur)output = []for idx, sentences in enumerate(splitted):chunk = Chunk(id=generate_hash_id(f"{org_chunk.id}#{idx}"),name=f"{org_chunk.name}_split_{idx}",content=sep.join(sentences),type=org_chunk.type,chunk_size=chunk_size,window_length=window_length,**org_chunk.kwargs,)output.append(chunk)return output
- strict_length_chunk:嚴格按長度將內容拆分為塊,而不保留句子邊界:
def strict_length_chunk(self,org_chunk: Chunk,) -> List[Chunk]:content = org_chunk.contenttotal_length = len(content)output = []position = 0chunk_index = 0while position < total_length:# 計算當前chunk的內容chunk_content = content[position : position + self.split_length]# 創建新的Chunk對象chunk = Chunk(id=generate_hash_id(f"{org_chunk.id}#{chunk_index}"),name=f"{org_chunk.name}_split_{chunk_index}",content=chunk_content,type=org_chunk.type,chunk_size=self.split_length,window_length=self.window_length,**org_chunk.kwargs,)output.append(chunk)# 更新位置和索引position = position + self.split_length - self.window_lengthchunk_index += 1# 確保不會出現負索引if position < 0:position = 0return output
- _invoke:根據指定的長度和窗口大小調用輸入塊的拆分:
- 參數:
- input (Chunk):要拆分的chunk;
- ?**kwargs:其他關鍵字參數,當前未使用,但保留以備將來擴展。
- 返回: List[Output],拆分作生成的Chunk對象列表。
- 參數:
def _invoke(self, input: Chunk, **kwargs) -> List[Output]:cutted = []if isinstance(input, list):for item in input:cutted.extend(self.slide_window_chunk(item, self.split_length, self.window_length))else:cutted.extend(self.slide_window_chunk(input, self.split_length, self.window_length))return cutted
3.3?extractor
extractor用于從文本中提取實體、關系和事件,并構建知識圖譜的子圖。
下面是基于架構約束extractor的相關代碼,其中:
- 架構約束extractor的功能:執行知識提取以實施架構約束,包括實體、事件及其邊緣,實體和事件的類型及其各自的屬性會自動從項目的架構繼承;
- __init__:構造函數,初始化實例;
- input_types:輸入類型為splitter劃分的文本塊;
- output_types:輸出類型為子圖;
@ExtractorABC.register("schema_constraint")
@ExtractorABC.register("schema_constraint_extractor")
class SchemaConstraintExtractor(ExtractorABC):def __init__(self,llm: LLMClient,ner_prompt: PromptABC = None,std_prompt: PromptABC = None,relation_prompt: PromptABC = None,event_prompt: PromptABC = None,external_graph: ExternalGraphLoaderABC = None,):super().__init__()self.llm = llmself.schema = SchemaClient(host_addr=KAG_PROJECT_CONF.host_addr, project_id=KAG_PROJECT_CONF.project_id).load()self.ner_prompt = ner_promptself.std_prompt = std_promptself.relation_prompt = relation_promptself.event_prompt = event_promptbiz_scene = KAG_PROJECT_CONF.biz_sceneif self.ner_prompt is None:self.ner_prompt = init_prompt_with_fallback("ner", biz_scene)if self.std_prompt is None:self.std_prompt = init_prompt_with_fallback("std", biz_scene)self.external_graph = external_graph@propertydef input_types(self) -> Type[Input]:return Chunk@propertydef output_types(self) -> Type[Output]:return SubGraph
- _named_entity_recognition_llm:調用語言模型,執行命名實體識別:
- 參數:passage,要處理的文本內容;
- 返回值:ner_result:命名實體識別的結果。
def _named_entity_recognition_llm(self, passage: str):ner_result = self.llm.invoke({"input": passage}, self.ner_prompt, with_except=False)return ner_result
- _named_entity_recognition_process:對命名實體識別的結果進行去重和整合:
- 參數:
- passage:原始文本內容;
- ner_resulet:命名實體識別的結果。
- 返回值:output,處理后的實體列表,包含唯一的實體信息。
- 參數:
def _named_entity_recognition_process(self, passage, ner_result):if self.external_graph:extra_ner_result = self.external_graph.ner(passage)else:extra_ner_result = []output = []dedup = set()for item in extra_ner_result:name = item.nameif name not in dedup:dedup.add(name)output.append({"name": name,"category": item.label,"properties": item.properties,})for item in ner_result:name = item.get("name", None)category = item.get("category", None)if name is None or category is None:continueif not isinstance(name, str):continueif name not in dedup:dedup.add(name)output.append(item)return output
- named_entity_recognition:對給定的文本段落執行命名實體識別:
- 參數: passage (str),要對其執行命名實體識別的文本;
- 返回值: 命名實體識別作的結果。
@retry(stop=stop_after_attempt(3),wait=wait_exponential(multiplier=10, max=60),reraise=True,)def named_entity_recognition(self, passage: str):ner_result = self._named_entity_recognition_llm(passage)return self._named_entity_recognition_process(passage, ner_result)
- named_entity_standardization:對給定的文本段落和實體執行命名實體標準化:
- 參數:
- passage (str):文本段落;
- entities (List[Dict]):要標準化的實體列表。
- 返回值:命名實體標準化作的結果。
- 參數:
@retry(stop=stop_after_attempt(3),wait=wait_exponential(multiplier=10, max=60),reraise=True,)def named_entity_standardization(self, passage: str, entities: List[Dict]):return self.llm.invoke({"input": passage, "named_entities": entities},self.std_prompt,with_except=False,)
- relations_extraction:對給定的文本段落和實體執行關系提取:
- 參數:
- passage (str):文本段落;
- entities (List[Dict]):實體列表。
- 返回值:?關系提取作的結果。
- 參數:
@retry(stop=stop_after_attempt(3),wait=wait_exponential(multiplier=10, max=60),reraise=True,)def relations_extraction(self, passage: str, entities: List[Dict]):if self.relation_prompt is None:logger.debug("Relation extraction prompt not configured, skip.")return []return self.llm.invoke({"input": passage, "entity_list": entities},self.relation_prompt,with_except=False,)
- event_extraction:?對給定的文本段落執行事件提取:
- 參數:passage (str),文本段落;
- 返回:事件提取作的結果。
@retry(stop=stop_after_attempt(3),wait=wait_exponential(multiplier=10, max=60),reraise=True,)def event_extraction(self, passage: str):if self.event_prompt is None:logger.debug("Event extraction prompt not configured, skip.")return []return self.llm.invoke({"input": passage}, self.event_prompt, with_except=False)
- parse_nodes_and_edges:從實體列表中解析節點和邊:
- 參數:entities (List[Dict]),實體列表;
- 返回:Tuple[List[Node], List[Edge]],解析的節點和邊。
def parse_nodes_and_edges(self, entities: List[Dict], category: str = None):graph = SubGraph([], [])entities = copy.deepcopy(entities)root_nodes = []for record in entities:if record is None:continueif isinstance(record, str):record = {"name": record}s_name = record.get("name", "")s_label = record.get("category", category)properties = record.get("properties", {})# 有時,名稱和/或標簽會放置在屬性中。if not s_name:s_name = properties.pop("name", "")if not s_label:s_label = properties.pop("category", "")if not s_name or not s_label:continues_name = processing_phrases(s_name)root_nodes.append((s_name, s_label))tmp_properties = copy.deepcopy(properties)spg_type = self.schema.get(s_label)for prop_name, prop_value in properties.items():if prop_value is None:tmp_properties.pop(prop_name)continueif prop_name in spg_type.properties:prop_schema = spg_type.properties.get(prop_name)o_label = prop_schema.object_type_name_enif o_label not in BASIC_TYPES:# 彈出并將屬性轉換為節點和邊if not isinstance(prop_value, list):prop_value = [prop_value](new_root_nodes,new_nodes,new_edges,) = self.parse_nodes_and_edges(prop_value, o_label)graph.nodes.extend(new_nodes)graph.edges.extend(new_edges)# 將當前節點連接到屬性生成的節點for node in new_root_nodes:graph.add_edge(s_id=s_name,s_label=s_label,p=prop_name,o_id=node[0],o_label=node[1],)tmp_properties.pop(prop_name)record["properties"] = tmp_properties# 注意:對于轉換為節點/邊的屬性,我們會保留原始屬性值的副本。graph.add_node(id=s_name, name=s_name, label=s_label, properties=properties)if "official_name" in record:official_name = processing_phrases(record["official_name"])if official_name != s_name:graph.add_node(id=official_name,name=official_name,label=s_label,properties=dict(properties),)graph.add_edge(s_id=s_name,s_label=s_label,p="OfficialName",o_id=official_name,o_label=s_label,)return root_nodes, graph.nodes, graph.edges
- add_relations_to_graph:根據關系和實體列表向子圖添加邊:
- 參數:
- sub_graph (SubGraph):要向其添加邊的子圖;
- entities (List[Dict]): 實體列表,用于查找類別信息;
- relations (List[list]): 一個關系列表,每個關系代表要添加到子圖中的一個關系。
- ?返回:構造的子圖。
- 參數:
@staticmethoddef add_relations_to_graph(sub_graph: SubGraph, entities: List[Dict], relations: List[list]):for rel in relations:if len(rel) != 5:continues_name, s_category, predicate, o_name, o_category = rels_name = processing_phrases(s_name)sub_graph.add_node(s_name, s_name, s_category)o_name = processing_phrases(o_name)sub_graph.add_node(o_name, o_name, o_category)edge_type = to_camel_case(predicate)if edge_type:sub_graph.add_edge(s_name, s_category, edge_type, o_name, o_category)return sub_graph
- add_chunk_to_graph:將 Chunk 對象與子圖關聯,將其添加為節點并將其與現有節點連接:
- 參數:
- sub_graph (SubGraph):要向其添加數據塊信息的子圖;
- chunk (Chunk):包含文本和元數據的 chunk 對象。
- 返回:構造的子圖。
- 參數:
@staticmethoddef add_chunk_to_graph(sub_graph: SubGraph, chunk: Chunk):for node in sub_graph.nodes:sub_graph.add_edge(node.id, node.label, "source", chunk.id, CHUNK_TYPE)sub_graph.add_node(id=chunk.id,name=chunk.name,label=CHUNK_TYPE,properties={"id": chunk.id,"name": chunk.name,"content": f"{chunk.name}\n{chunk.content}",**chunk.kwargs,},)sub_graph.id = chunk.idreturn sub_graph
- assemble_subgraph:從給定的 chunk、entities、events 和 relations 中組裝一個 subgraph:
- 參數:
- chunk (Chunk):塊對象;
- entities (List[Dict]):實體列表;
- events (List[Dict]): 事件列表。
- 返回:構造的子圖。
- 參數:
def assemble_subgraph(self,chunk: Chunk,entities: List[Dict],relations: List[list],events: List[Dict],):graph = SubGraph([], [])_, entity_nodes, entity_edges = self.parse_nodes_and_edges(entities)graph.nodes.extend(entity_nodes)graph.edges.extend(entity_edges)_, event_nodes, event_edges = self.parse_nodes_and_edges(events)graph.nodes.extend(event_nodes)graph.edges.extend(event_edges)self.add_relations_to_graph(graph, entities, relations)self.add_chunk_to_graph(graph, chunk)return graph
- append_official_name:將正式名稱附加到實體:
- 參數:
- source_entities (List[Dict]):源實體的列表;
- entities_with_official_name (List[Dict]):具有正式名稱的實體列表。
- 參數:
def append_official_name(self, source_entities: List[Dict], entities_with_official_name: List[Dict]):tmp_dict = {}for tmp_entity in entities_with_official_name:name = tmp_entity["name"]category = tmp_entity["category"]official_name = tmp_entity["official_name"]key = f"{category}{name}"tmp_dict[key] = official_namefor tmp_entity in source_entities:name = tmp_entity["name"]category = tmp_entity["category"]key = f"{category}{name}"if key in tmp_dict:official_name = tmp_dict[key]tmp_entity["official_name"] = official_name
- postprocess_graph:通過合并具有相同名稱和標簽的節點來對圖形進行后處理:
- 參數:graph (SubGraph),要后處理的圖形;
- 返回:后處理圖。
def postprocess_graph(self, graph):try:all_node_properties = {}for node in graph.nodes:id_ = node.idname = node.namelabel = node.labelkey = (id_, name, label)if key not in all_node_properties:all_node_properties[key] = node.propertieselse:all_node_properties[key].update(node.properties)new_graph = SubGraph([], [])for key, node_properties in all_node_properties.items():id_, name, label = keynew_graph.add_node(id=id_, name=name, label=label, properties=node_properties)new_graph.edges = graph.edgesreturn new_graphexcept:return graph
- _invoke:在給定的輸入上調用提取器:
- 參數:
- input (Input):輸入數據;
- **kwargs:其他關鍵字參數。
- 返回:?List[Output],輸出結果的列表。
- 參數:
def _invoke(self, input: Input, **kwargs) -> List[Output]:title = input.namepassage = title + "\n" + input.contentout = []entities = self.named_entity_recognition(passage)events = self.event_extraction(passage)named_entities = []for entity in entities:named_entities.append({"name": entity["name"], "category": entity["category"]})relations = self.relations_extraction(passage, named_entities)std_entities = self.named_entity_standardization(passage, named_entities)self.append_official_name(entities, std_entities)subgraph = self.assemble_subgraph(input, entities, relations, events)out.append(self.postprocess_graph(subgraph))logger.debug(f"input passage:\n{passage}")logger.debug(f"output graphs:\n{out}")return out
其中,_named_entity_recognition_llm、named_entity_recognition、named_entity_standardization、relations_extraction、event_extraction、_invoke等方法除了上述的同步方法外,還有對應的異步方法。
3.4?vectorizer
vectorizer用于為知識圖譜中的節點屬性生成嵌入向量,可以增強知識圖譜的語義表示,為后續的查詢和推理提供支持。
其中,BatchVectorizer用于為SubGraph中的節點屬性批量生成嵌入向量的類,通過批量處理提高了效率,并支持同步和異步操作模式。
BatchVectorizer屬性:
- project_id (int):與SubGraph關聯的工程的ID;
- vec_meta (defaultdict):SubGraph中向量字段的元數據;
- vectorize_model (VectorizeModelABC):用于生成嵌入向量的模型;
- batch_size (int):處理節點的批處理的大小,默認值為 32。
@VectorizerABC.register("batch")
@VectorizerABC.register("batch_vectorizer")
class BatchVectorizer(VectorizerABC):def __init__(self,vectorize_model: VectorizeModelABC,batch_size: int = 32,disable_generation: Optional[List[str]] = None,):super().__init__()self.project_id = KAG_PROJECT_CONF.project_id# self._init_graph_store()self.vec_meta = self._init_vec_meta()self.vectorize_model = vectorize_modelself.batch_size = batch_sizeself.disable_generation = disable_generationdef _init_vec_meta(self):vec_meta = defaultdict(list)schema_client = SchemaClient(host_addr=KAG_PROJECT_CONF.host_addr, project_id=self.project_id)spg_types = schema_client.load()for type_name, spg_type in spg_types.items():for prop_name, prop in spg_type.properties.items():if prop_name == "name" or prop.index_type in [# if prop.index_type in [IndexTypeEnum.Vector,IndexTypeEnum.TextAndVector,]:vec_meta[type_name].append(get_vector_field_name(prop_name))return vec_meta
_generate_embedding_vectors:為輸入 SubGraph 中的節點生成嵌入向量:
- 參數:input_subgraph (SubGraph),要為其生成嵌入向量的 SubGraph。
- 返回:SubGraph,帶有生成的嵌入向量的修改后的 SubGraph。
@retry(stop=stop_after_attempt(3), reraise=True)def _generate_embedding_vectors(self, input_subgraph: SubGraph) -> SubGraph:node_list = []node_batch = []for node in input_subgraph.nodes:if not node.id or not node.name:continueproperties = {"id": node.id, "name": node.name}properties.update(node.properties)node_list.append((node, properties))node_batch.append((node.label, properties.copy()))generator = EmbeddingVectorGenerator(self.vectorize_model, self.vec_meta, self.disable_generation)generator.batch_generate(node_batch, self.batch_size)for (node, properties), (_node_label, new_properties) in zip(node_list, node_batch):for key, value in properties.items():if key in new_properties and new_properties[key] == value:del new_properties[key]node.properties.update(new_properties)return input_subgraph
_invoke:調用輸入 SubGraph 的嵌入向量的生成:
- 參數:
- input_subgraph (Input):要為其生成嵌入向量的 SubGraph;
- **kwargs:其他關鍵字參數,當前未使用,但保留以備將來擴展。
- 返回:List[Output],包含修改后的 SubGraph 和生成的嵌入向量的列表。
def _invoke(self, input_subgraph: Input, **kwargs) -> List[Output]:modified_input = self._generate_embedding_vectors(input_subgraph)return [modified_input]
3.5 writer
writer用于將子圖寫入知識圖譜存儲,在知識圖譜構建流程中負責最終的數據持久化。
KGWriter是用于將 SubGraph 寫入知識圖 (KG) 存儲的類。此類繼承自 SinkWriterABC,并提供將 SubGraph 寫入知識圖存儲系統的功能,它支持upsert和delete等操作:
class AlterOperationEnum(str, Enum):Upsert = "UPSERT"Delete = "DELETE"@SinkWriterABC.register("kg", as_default=True)
@SinkWriterABC.register("kg_writer", as_default=True)
class KGWriter(SinkWriterABC):def __init__(self, project_id: int = None, delete: bool = False, **kwargs):super().__init__(**kwargs)if project_id is None:self.project_id = KAG_PROJECT_CONF.project_idelse:self.project_id = project_idself.client = GraphClient(host_addr=KAG_PROJECT_CONF.host_addr, project_id=project_id)self.delete = delete@propertydef input_types(self) -> Type[Input]:return SubGraph@propertydef output_types(self) -> Type[Output]:return None
format_label:通過添加項目命名空間(如果尚不存在)來設置標簽的格式:
- 參數:?label (str),需要格式化的標簽;
- 返回:?str,格式化的標簽。
def format_label(self, label: str):namespace = KAG_PROJECT_CONF.namespaceif label.split(".")[0] == namespace:return labelreturn f"{namespace}.{label}"
-
standarlize_graph:標準化子圖,格式化標簽并將非字符串屬性轉換為JSON字符串:
def standarlize_graph(self, graph):for node in graph.nodes:node.label = self.format_label(node.label)for edge in graph.edges:edge.from_type = self.format_label(edge.from_type)edge.to_type = self.format_label(edge.to_type)for node in graph.nodes:for k, v in node.properties.items():if k.startswith("_"):continueif not isinstance(v, str):node.properties[k] = json.dumps(v, ensure_ascii=False)for edge in graph.edges:for k, v in edge.properties.items():if k.startswith("_"):continueif not isinstance(v, str):edge.properties[k] = json.dumps(v, ensure_ascii=False)return graph
_invoke:在圖形存儲上調用指定的作(upsert 或 delete)。
- 參數:
- input (Input):表示要作的子圖的 input 對象;
- alter_operation (str):要執行的作類型(Upsert 或 Delete),默認為 Upsert;
- lead_to_builder (bool):啟用潛在客戶到事件推斷生成器,默認為 False。
- 返回:List[Output],輸出對象列表(當前始終為 [None])。
def _invoke(self,input: Input,alter_operation: str = AlterOperationEnum.Upsert,lead_to_builder: bool = False,**kwargs,) -> List[Output]:if self.delete:self.client.write_graph(sub_graph=input.to_dict(),operation=AlterOperationEnum.Delete,lead_to_builder=lead_to_builder,)else:input = self.standarlize_graph(input)logger.debug(f"final graph to write: {input}")self.client.write_graph(sub_graph=input.to_dict(),operation=alter_operation,lead_to_builder=lead_to_builder,)return [input]
invoke:處理輸入數據并調用_invoke執行寫入操作,支持檢查點功能:
def invoke(self, input: Input, **kwargs) -> List[Union[Output, BuilderComponentData]]:if isinstance(input, BuilderComponentData):input_data = input.datainput_key = input.hash_keyelse:input_data = inputinput_key = Noneif self.inherit_input_key:output_key = input_keyelse:output_key = Nonewrite_ckpt = kwargs.get("write_ckpt", True)if write_ckpt and self.checkpointer:# found existing data in checkpointerif input_key and self.checkpointer.exists(input_key):return []# not foundoutput = self._invoke(input_data, **kwargs)# We only record the data key to avoid embeddings from taking up too much disk space.if input_key:self.checkpointer.write_to_ckpt(input_key, input_key)return [BuilderComponentData(x, output_key) for x in output]else:output = self._invoke(input_data, **kwargs)return [BuilderComponentData(x, output_key) for x in output]
_handle:為 SPGServer 提供的調用接口:
- 參數:
- input (Dict):表示要作的子圖的輸入字典;
- alter_operation (str):要執行的作類型(Upsert 或 Delete);
- **kwargs:其他關鍵字參數。
- 返回:None,此方法當前返回 None。
def _handle(self, input: Dict, alter_operation: str, **kwargs):_input = self.input_types.from_dict(input)_output = self.invoke(_input, alter_operation) # noqareturn None
3.6 可選組件
除了上述的常用組件外,KAG-Builder還提供一些可選組件,例如:
- post_processor:對構建好的知識圖譜進行后處理,如數據清洗、格式轉換、質量評估等;
- mapping:實現不同數據源、不同格式之間的映射和轉換;
- scanner:對數據進行快速掃描和預處理,識別數據中的關鍵信息和結構;
- external_graph:用于與外部知識圖譜進行交互和融合,可以導入或導出知識圖譜數據。
4 示例
如下圖所示:?
當給KAG一個txt的文本文件,
- 首先,reader將這個文件讀入chunk中,例如:
亞伯拉罕·林肯采取了溫和的立場,旨在盡快使南方回歸聯邦,而國會中的激進共和黨人則尋求更強有力的措施來提升非裔美國人的權利。共和黨重新奪回了他們在2006年中期選舉中失去的眾議院控制權,凈增了63個席位。
- splitter再將這個大chunk,分為更小的chunk:
亞伯拉罕·林肯采取了溫和的立場,旨在盡快使南方回歸聯邦,而國會中的激進共和黨人則尋求更強有力的措施來提升非裔美國人的權利。
共和黨重新奪回了他們在2006年中期選舉中失去的眾議院控制權,凈增了63個席位。
- 然后,由extractor進行信息提取,從切分后的文本中提取實體、關系等知識圖譜的核心元素:
- 再由aligner進行知識對齊,將提取器結果與語義架構標準化對齊,包括文檔語義對齊和概念語義圖對齊:
- 最后,vectorizer將結果向量化,再由writer進行圖譜寫入,將子圖寫入圖存儲數據庫:
總結
通過對 Schema、Prompt 以及 KAG-Builder 的學習,我對知識圖譜構建過程中的數據定義、語言模型引導和構建流程管理有了更深入的理解:
- Schema是定義知識圖譜語義框架和數據組織結構的核心組件,Schema 是知識圖譜的架構定義,它描述了實體和關系的類型及其屬性。
- Prompt 是自然語言處理任務中用于引導模型生成符合預期輸出的模板。在 KAG項目中,Prompt 被廣泛應用于各種任務,如命名實體識別(NER)、關系抽取、事件抽取等。通過設計合適的 Prompt,可以有效地指導語言模型理解任務要求并生成準確的結構化輸出。
- KAG-Builder 是 KAG 項目中用于構建知識圖譜的核心組件,它由多個功能模塊組成,包括 Reader(讀取器)、Splitter(切分器)、Extractor(提取器)、Vectorizer(向量化器)、PostProcessor(后處理器)和 Writer(寫入器)。這些模塊通過構建鏈(Builder Chain)連接成一個完整的工作流程,負責從數據讀取到知識圖譜構建的全過程。