環境依賴
jdk、neo4j圖數據庫
操作一條數據完整demo
import os,json,sys,io
from py2neo import Graph,Nodetry:sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
except Exception:passclass MedicalGraph:def __init__(self):self.data_path = r'D:\skstudy\medical2.json'if not os.path.exists(self.data_path):raise FileNotFoundError(f"數據文件未找到: {self.data_path}")self.g = Graph('bolt://110.110.110.110:7110', auth=('110', 'neo4j110'))def read_nodes(self):diseases = [] # 疾病名drugs = [] # 藥品名departments = [] # 科室名disease_infos = [] # 疾病詳細信息rels_disease_drug = [] # 疾病-藥品rels_disease_department = [] # 疾病-科室rels_department_department = [] # 科室-科室count = 0with open(self.data_path, 'r', encoding='utf-8') as f:data_json= json.load(f)# for line in f:# line = line.strip()# if not line:# continue# try:# data_json = json.loads(line)# except json.JSONDecodeError as e:# print(f"JSON解析錯誤: {e}, 行內容: {line}")# print(f"錯誤詳情:{e}")# continuedisease_name = data_json['name']diseases.append(disease_name)disease_dict = {'name': disease_name,'recommand_drug': [],'cure_department': []}# 處理科室if 'cure_department' in data_json:cure_department = data_json['cure_department']if isinstance(cure_department, list):disease_dict['cure_department'] = cure_departmentdepartments.extend(cure_department)if len(cure_department) == 1:rels_disease_department.append([disease_name, cure_department[0]])elif len(cure_department) >= 2:rels_disease_department.append([disease_name, cure_department[1]])rels_department_department.append([cure_department[1], cure_department[0]])# 處理推薦藥物if 'recommand_drug' in data_json:recommand_drug = data_json['recommand_drug']if isinstance(recommand_drug, list):disease_dict['recommand_drug'] = recommand_drugdrugs.extend(recommand_drug)for drug in recommand_drug:rels_disease_drug.append([disease_name, drug])disease_infos.append(disease_dict)# 去重return set(diseases), set(drugs), set(departments), disease_infos, \rels_disease_drug, rels_disease_department, rels_department_departmentdef create_node(self, label, nodes):count = 0for node_name in nodes:if not node_name: # 過濾空字符串continuenode = Node(label, name=node_name)self.g.merge(node, label, 'name') # 使用 merge 避免重復創建count += 1if count % 100 == 0:print(f"{label} 節點創建: {count}/{len(nodes)}")print(f"? {label} 節點創建完成,共 {count} 個")def create_diseases_nodes(self, disease_infos):count = 0for disease_dict in disease_infos:node = Node('Disease',name=disease_dict['name'],recommand_drug=disease_dict['recommand_drug'],cure_department=disease_dict['cure_department'])self.g.merge(node, 'Disease', 'name')count += 1if count % 100 == 0:print(f"疾病節點創建: {count}")print(f"? 疾病節點創建完成,共 {count} 個")def create_graphnodes(self):diseases, drugs, departments, disease_infos, _, _, _ = self.read_nodes()self.create_diseases_nodes(disease_infos)self.create_node('Drug', drugs)self.create_node('Department', departments)def create_relationship(self, start_label, end_label, edges, rel_type, rel_name):count = 0# 去重unique_edges = list(set(["###".join(edge) for edge in edges]))total = len(unique_edges)for edge_str in unique_edges:p_name, q_name = edge_str.split('###')if not p_name or not q_name:continue# 使用參數化查詢,避免注入和引號問題query = ("MATCH (p:%s {name: $p_name}), (q:%s {name: $q_name}) ""MERGE (p)-[rel:%s {name: $rel_name}]->(q)") % (start_label, end_label, rel_type)try:self.g.run(query, p_name=p_name, q_name=q_name, rel_name=rel_name)count += 1if count % 100 == 0:print(f"{rel_name} 關系創建: {count}/{total}")except Exception as e:print(f"創建關系失敗: {e}, 邊: {p_name} -> {q_name}")print(f"? {rel_name} 關系創建完成,共 {count} 個")def create_graphrels(self):_, _, _, _, rels_disease_drug, rels_disease_department, rels_department_department = self.read_nodes()self.create_relationship('Disease', 'Drug', rels_disease_drug, 'RECOMMAND_EAT', '宜吃')self.create_relationship('Disease', 'Department', rels_disease_department, 'BELONGS_TO', '所屬科室')self.create_relationship('Department', 'Department', rels_department_department, 'BELONGS_TO', '屬于')def export_data(self):diseases, drugs, departments, _, _, _, _ = self.read_nodes()for filename, data in [('disease.txt', diseases), ('drug.txt', drugs), ('department.txt', departments)]:with open(filename, 'w', encoding='utf-8') as f:f.write('\n'.join(sorted(data)))print(f"? 已導出 {filename}")if __name__ == '__main__':medical_graph = MedicalGraph()medical_graph.create_graphnodes()medical_graph.create_graphrels()medical_graph.export_data()
運行看下情況
使用的json模板
{"_id": {"$oid": "5bb578b6831b973a137e3ee7"},"name": "慢性阻塞性肺疾病","desc": "慢性阻塞性肺疾病(COPD)是一種常見的以持續性氣流受限為特征的呼吸系統疾病,主要由長期吸煙、空氣污染或職業粉塵暴露引起,表現為慢性咳嗽、咳痰和進行性呼吸困難。","category": ["疾病百科", "內科", "呼吸內科"],"prevent": "1、戒煙是預防COPD最重要的措施。\n2、避免接觸工業粉塵、煙霧和空氣污染物。\n3、定期接種流感疫苗和肺炎疫苗。","cause": "主要病因包括長期吸煙、吸入有害氣體或顆粒(如煤煙、粉塵)、遺傳因素(如α1-抗胰蛋白酶缺乏)、反復呼吸道感染等。吸煙是導致COPD最常見且可預防的原因。","symptom": ["咳嗽", "咳痰", "呼吸困難", "喘息", "胸悶"],"yibao_status": "是","get_prob": "約0.3%","get_way": "無傳染性","acompany": ["肺心病", "自發性氣胸", "呼吸衰竭"],"cure_department": ["內科", "呼吸內科"],"cure_way": ["藥物治療", "氧療", "肺康復訓練"],"cure_lasttime": "長期管理,需終身控制","cured_prob": "不可完全治愈,但可控制病情","cost_money": "年均費用約5000-20000元,視病情嚴重程度而定","check": ["肺功能檢查", "胸部X線", "血氣分析", "高分辨率CT"],"recommand_drug": ["沙美特羅替卡松粉吸入劑", "噻托溴銨", "布地奈德福莫特羅"],"drug_detail": ["沙美特羅替卡松:每日兩次,用于緩解支氣管痙攣","噻托溴銨:長效抗膽堿藥,改善肺功能","布地奈德福莫特羅:控制炎癥與擴張支氣管聯合用藥"]}
查詢下neo4j庫中信息
確實,只顯示了一個節點的信息。還需要再修改,將json的內容再通過大模型擴寫下。豐富下內容
通過大模型擴容后的json文件,后續如果是其他數據的json文件,都可以按照這個模板去創建節點,創建關系。
操作多條數據完美demo
json文件
[{"_id": {"$oid": "5bb578b6831b973a137e3ee7"},"name": "慢性阻塞性肺疾病","desc": "慢性阻塞性肺疾病(COPD)是一種常見的以持續性氣流受限為特征的呼吸系統疾病,主要由長期吸煙、空氣污染或職業粉塵暴露引起,表現為慢性咳嗽、咳痰和進行性呼吸困難。","category": ["疾病百科", "內科", "呼吸內科"],"prevent": "1、戒煙是預防COPD最重要的措施。\n2、避免接觸工業粉塵、煙霧和空氣污染物。\n3、定期接種流感疫苗和肺炎疫苗。","cause": "主要病因包括長期吸煙、吸入有害氣體或顆粒(如煤煙、粉塵)、遺傳因素(如α1-抗胰蛋白酶缺乏)、反復呼吸道感染等。吸煙是導致COPD最常見且可預防的原因。","symptom": ["咳嗽", "咳痰", "呼吸困難", "喘息", "胸悶"],"yibao_status": "是","get_prob": "約0.3%","get_way": "無傳染性","acompany": ["肺心病", "自發性氣胸", "呼吸衰竭"],"cure_department": ["內科", "呼吸內科"],"cure_way": ["藥物治療", "氧療", "肺康復訓練"],"cure_lasttime": "長期管理,需終身控制","cured_prob": "不可完全治愈,但可控制病情","cost_money": "年均費用約5000-20000元,視病情嚴重程度而定","check": ["肺功能檢查", "胸部X線", "血氣分析", "高分辨率CT"],"recommand_drug": ["沙美特羅替卡松粉吸入劑", "噻托溴銨", "布地奈德福莫特羅"],"drug_detail": ["沙美特羅替卡松:每日兩次,用于緩解支氣管痙攣","噻托溴銨:長效抗膽堿藥,改善肺功能","布地奈德福莫特羅:控制炎癥與擴張支氣管聯合用藥"]},{"_id": {"$oid": "5bb578b6831b973a137e3ee8"},"name": "支氣管哮喘","desc": "支氣管哮喘是一種慢性氣道炎癥性疾病,特征為可逆性氣流受限、氣道高反應性和反復發作的喘息、呼吸困難、胸悶和咳嗽,尤其在夜間或清晨加重。","category": ["疾病百科", "內科", "呼吸內科"],"prevent": "1、避免接觸過敏原(如花粉、塵螨、寵物皮屑)。\n2、保持室內空氣流通,控制濕度。\n3、避免劇烈運動和冷空氣刺激。","cause": "與遺傳易感性、環境因素(如過敏原、空氣污染)、呼吸道病毒感染、職業性刺激物暴露等有關。免疫系統異常激活導致氣道慢性炎癥。","symptom": ["喘息", "呼吸困難", "胸悶", "咳嗽", "夜間憋醒"],"yibao_status": "是","get_prob": "約1%-3%","get_way": "無傳染性","acompany": ["肺氣腫", "呼吸衰竭", "焦慮障礙"],"cure_department": ["內科", "呼吸內科"],"cure_way": ["吸入性藥物治療", "脫敏治療", "生活方式干預"],"cure_lasttime": "長期控制,部分兒童可緩解","cured_prob": "約30%兒童可臨床治愈,成人多為控制","cost_money": "年均2000-10000元,取決于用藥方案","check": ["肺功能檢查", "呼出氣一氧化氮檢測", "過敏原測試"],"recommand_drug": ["丙酸氟替卡松", "沙丁胺醇", "孟魯司特鈉"],"drug_detail": ["丙酸氟替卡松:每日吸入,控制氣道炎癥","沙丁胺醇:急救用支氣管擴張劑","孟魯司特鈉:口服抗炎藥,適用于過敏性哮喘"]},{"_id": {"$oid": "5bb578b6831b973a137e3ee9"},"name": "肺炎","desc": "肺炎是指終末氣道、肺泡和肺間質的炎癥,可由細菌、病毒、真菌或非典型病原體引起,常見癥狀包括發熱、咳嗽、咳痰和呼吸困難。","category": ["疾病百科", "內科", "呼吸內科"],"prevent": "1、接種肺炎球菌疫苗和流感疫苗。\n2、增強體質,避免受涼感冒。\n3、注意個人衛生,勤洗手,戴口罩。","cause": "常見病原體包括肺炎鏈球菌、流感嗜血桿菌、支原體、病毒(如流感病毒、新冠病毒)等。機體免疫力下降時易發生感染。","symptom": ["發熱", "咳嗽", "咳痰", "胸痛", "呼吸困難"],"yibao_status": "是","get_prob": "每年約1%-2%","get_way": "可通過飛沫傳播","acompany": ["胸腔積液", "敗血癥", "急性呼吸窘迫綜合征"],"cure_department": ["內科", "呼吸內科"],"cure_way": ["抗生素治療", "抗病毒治療", "支持治療"],"cure_lasttime": "輕癥約1-2周,重癥可達4周以上","cured_prob": "約90%以上可治愈","cost_money": "普通住院約5000-15000元","check": ["胸部X光", "血常規", "痰培養", "C反應蛋白"],"recommand_drug": ["阿莫西林克拉維酸", "左氧氟沙星", "頭孢曲松"],"drug_detail": ["阿莫西林克拉維酸:廣譜抗生素,用于社區獲得性肺炎","左氧氟沙星:針對革蘭陰性菌有效","頭孢曲松:靜脈用藥,重癥常用"]},{"_id": {"$oid": "5bb578b6831b973a137e3eea"},"name": "肺結核","desc": "肺結核是由結核分枝桿菌引起的慢性傳染病,主要侵犯肺部,表現為咳嗽、咳痰、咯血、低熱、盜汗、乏力等癥狀,具有較強傳染性。","category": ["疾病百科", "內科", "呼吸內科"],"prevent": "1、接種卡介苗(BCG)。\n2、避免與活動性肺結核患者密切接觸。\n3、保持良好通風環境,增強免疫力。","cause": "由結核分枝桿菌感染引起,通過空氣飛沫傳播。當人體抵抗力降低時,潛伏菌可重新活躍致病。","symptom": ["咳嗽", "咳痰", "咯血", "低熱", "盜汗", "體重下降"],"yibao_status": "是","get_prob": "中國年發病率約0.06%","get_way": "通過呼吸道飛沫傳播","acompany": ["結核性胸膜炎", "肺空洞", "播散性結核"],"cure_department": ["內科", "呼吸內科"],"cure_way": ["抗結核化療", "隔離治療", "營養支持"],"cure_lasttime": "至少6-9個月","cured_prob": "規范治療下治愈率可達90%以上","cost_money": "國家免費提供一線藥物,自費部分約1000-5000元","check": ["PPD試驗", "T-SPOT檢測", "胸部CT", "痰涂片找抗酸桿菌"],"recommand_drug": ["異煙肼", "利福平", "乙胺丁醇", "吡嗪酰胺"],"drug_detail": ["異煙肼:殺菌主力,需監測肝功能","利福平:強效殺菌,可能導致體液變紅","乙胺丁醇:防止耐藥,注意視力變化","吡嗪酰胺:早期殺菌作用強"]},{"_id": {"$oid": "5bb578b6831b973a137e3eeb"},"name": "間質性肺疾病","desc": "間質性肺疾病是一組以肺間質炎癥和纖維化為主要表現的異質性疾病群,病因多樣,進展緩慢,最終可導致肺功能嚴重受損。","category": ["疾病百科", "內科", "呼吸內科"],"prevent": "1、避免接觸粉塵、石棉、鳥糞等職業或環境致病因素。\n2、戒煙。\n3、及時治療自身免疫性疾病。","cause": "包括特發性肺纖維化、結締組織病相關間質性肺病、藥物或放射線損傷、塵肺等。確切機制涉及慢性炎癥與異常修復過程。","symptom": ["干咳", "進行性呼吸困難", "乏力", "杵狀指"],"yibao_status": "部分納入","get_prob": "約0.02%","get_way": "無傳染性","acompany": ["肺動脈高壓", "肺癌", "右心衰竭"],"cure_department": ["內科", "呼吸內科"],"cure_way": ["抗纖維化藥物", "糖皮質激素", "氧療"],"cure_lasttime": "長期治療,難以逆轉","cured_prob": "約10%-20%病情穩定,多數緩慢進展","cost_money": "年均1萬-5萬元以上,抗纖維化藥昂貴","check": ["高分辨率CT", "肺功能檢查", "肺活檢", "自身抗體檢測"],"recommand_drug": ["尼達尼布", "吡非尼酮", "潑尼松"],"drug_detail": ["尼達尼布:抑制纖維化進程","吡非尼酮:抗氧化、抗纖維化","潑尼松:用于炎癥活躍期"]},{"_id": {"$oid": "5bb578b6831b973a137e3eec"},"name": "肺癌","desc": "肺癌是起源于支氣管黏膜或肺泡上皮細胞的惡性腫瘤,是最常見的癌癥死因之一,分為小細胞肺癌和非小細胞肺癌兩大類。","category": ["疾病百科", "內科", "呼吸內科"],"prevent": "1、絕對戒煙并避免二手煙。\n2、減少廚房油煙、空氣污染暴露。\n3、定期體檢,高危人群做低劑量CT篩查。","cause": "主要危險因素包括吸煙(占80%以上)、職業致癌物(如石棉、砷)、電離輻射、遺傳易感性和空氣污染。","symptom": ["持續性咳嗽", "咯血", "胸痛", "消瘦", "聲音嘶啞"],"yibao_status": "是","get_prob": "約0.05%","get_way": "無傳染性","acompany": ["惡性胸水", "骨轉移", "腦轉移"],"cure_department": ["內科", "呼吸內科", "腫瘤科"],"cure_way": ["手術切除", "化療", "靶向治療", "免疫治療"],"cure_lasttime": "根據分期,治療周期數月至數年","cured_prob": "早期5年生存率可達60%-80%,晚期低于10%","cost_money": "治療總費用約5萬-50萬元不等","check": ["胸部CT", "PET-CT", "支氣管鏡活檢", "基因檢測"],"recommand_drug": ["吉非替尼", "奧希替尼", "帕博利珠單抗"],"drug_detail": ["吉非替尼:EGFR突變陽性患者一線用藥","奧希替尼:三代靶向藥,用于T790M突變","帕博利珠單抗:PD-1抑制劑,用于免疫治療"]},{"_id": {"$oid": "5bb578b6831b973a137e3eed"},"name": "肺栓塞","desc": "肺栓塞是由于內源性或外源性栓子堵塞肺動脈主干或分支,引起肺循環障礙的臨床綜合征,常見為下肢深靜脈血栓脫落所致。","category": ["疾病百科", "內科", "呼吸內科"],"prevent": "1、術后早期下床活動,預防深靜脈血栓。\n2、長途旅行時多活動下肢。\n3、高危人群可預防性抗凝。","cause": "最常見的栓子來自下肢深靜脈血栓形成(DVT),其他原因包括脂肪栓塞、空氣栓塞、羊水栓塞等。長期臥床、手術、腫瘤、妊娠為高危因素。","symptom": ["突發呼吸困難", "胸痛", "咯血", "心悸", "暈厥"],"yibao_status": "是","get_prob": "約0.01%","get_way": "無傳染性","acompany": ["右心衰竭", "休克", "慢性血栓栓塞性肺動脈高壓"],"cure_department": ["內科", "呼吸內科"],"cure_way": ["抗凝治療", "溶栓治療", "介入取栓"],"cure_lasttime": "急性期1-2周,抗凝治療持續3-6個月","cured_prob": "及時治療下存活率超90%","cost_money": "住院治療約2萬-8萬元","check": ["D-二聚體", "CT肺動脈造影", "下肢靜脈超聲"],"recommand_drug": ["低分子肝素", "華法林", "利伐沙班"],"drug_detail": ["低分子肝素:急性期首選抗凝藥","華法林:需監測INR,長期使用","利伐沙班:新型口服抗凝藥,使用方便"]},{"_id": {"$oid": "5bb578b6831b973a137e3eee"},"name": "睡眠呼吸暫停綜合征","desc": "睡眠呼吸暫停綜合征是一種在睡眠中反復出現呼吸暫停或低通氣的疾病,最常見為阻塞性類型,常伴有打鼾、白天嗜睡等癥狀。","category": ["疾病百科", "內科", "呼吸內科"],"prevent": "1、控制體重,避免肥胖。\n2、避免飲酒和鎮靜藥物。\n3、側臥睡眠,保持鼻腔通暢。","cause": "上氣道結構狹窄、肥胖、下頜后縮、長期吸煙飲酒、家族遺傳等因素導致睡眠時氣道塌陷。","symptom": ["打鼾", "呼吸暫停", "白天嗜睡", "晨起頭痛", "注意力不集中"],"yibao_status": "是","get_prob": "成人約2%-4%","get_way": "無傳染性","acompany": ["高血壓", "冠心病", "腦卒中"],"cure_department": ["內科", "呼吸內科"],"cure_way": ["持續氣道正壓通氣(CPAP)", "減重", "手術"],"cure_lasttime": "需長期管理","cured_prob": "通過治療可顯著改善癥狀,根治較難","cost_money": "CPAP設備約5000-15000元,治療費另計","check": ["多導睡眠監測(PSG)", "鼻咽喉鏡檢查", "血氧監測"],"recommand_drug": [],"drug_detail": []},{"_id": {"$oid": "5bb578b6831b973a137e3eef"},"name": "支氣管擴張癥","desc": "支氣管擴張癥是由于支氣管壁結構破壞導致其異常擴張的慢性疾病,常表現為慢性咳嗽、大量膿痰和反復咯血。","category": ["疾病百科", "內科", "呼吸內科"],"prevent": "1、積極治療兒童期呼吸道感染。\n2、接種疫苗預防麻疹、百日咳等。\n3、戒煙,避免刺激性氣體。","cause": "常見于兒童期嚴重肺部感染(如肺炎、結核)、免疫缺陷、囊性纖維化、纖毛功能障礙等。反復感染導致支氣管壁破壞。","symptom": ["慢性咳嗽", "大量膿痰", "咯血", "反復肺部感染", "杵狀指"],"yibao_status": "是","get_prob": "約0.01%","get_way": "無傳染性","acompany": ["肺膿腫", "慢性肺心病", "呼吸衰竭"],"cure_department": ["內科", "呼吸內科"],"cure_way": ["抗感染治療", "體位引流", "支氣管鏡吸痰"],"cure_lasttime": "長期慢性過程,需反復治療","cured_prob": "無法根治,但可控制癥狀","cost_money": "年均3000-10000元","check": ["高分辨率CT", "痰培養", "肺功能檢查"],"recommand_drug": ["阿莫西林克拉維酸", "左氧氟沙星", "氨溴索"],"drug_detail": ["阿莫西林克拉維酸:用于急性感染期","左氧氟沙星:覆蓋革蘭陰性菌","氨溴索:促進痰液排出"]},{"_id": {"$oid": "5bb578b6831b973a137e3ef0"},"name": "急性呼吸窘迫綜合征","desc": "急性呼吸窘迫綜合征(ARDS)是由于嚴重感染、創傷、休克等引起的急性彌漫性肺損傷,表現為嚴重低氧血癥和呼吸衰竭。","category": ["疾病百科", "內科", "呼吸內科"],"prevent": "1、及時治療原發病(如重癥肺炎、膿毒癥)。\n2、避免誤吸。\n3、合理輸血和補液。","cause": "直接肺損傷(如肺炎、吸入性肺炎)或間接損傷(如膿毒癥、嚴重創傷、胰腺炎)引發全身炎癥反應,導致肺泡-毛細血管屏障破壞。","symptom": ["嚴重呼吸困難", "呼吸急促", "紫紺", "煩躁不安", "低氧血癥"],"yibao_status": "是","get_prob": "重癥患者中約10%-15%","get_way": "無傳染性","acompany": ["多器官功能衰竭", "氣壓傷", "深靜脈血栓"],"cure_department": ["內科", "呼吸內科", "重癥醫學科"],"cure_way": ["機械通氣", "肺保護性通氣策略", "治療原發病"],"cure_lasttime": "數天至數周,部分遺留肺功能損害","cured_prob": "總體死亡率約30%-40%","cost_money": "ICU治療每日約1萬-3萬元,總費用高昂","check": ["動脈血氣分析", "胸部X光或CT", "肺力學監測"],"recommand_drug": ["哌拉西林他唑巴坦", "甲潑尼龍", "鎮靜肌松藥"],"drug_detail": ["哌拉西林他唑巴坦:廣譜抗生素,用于抗感染","甲潑尼龍:在特定階段減輕炎癥反應","鎮靜肌松藥:輔助機械通氣"]}
]
python腳本從讀取一條json數據,要修改成讀取所有的json數據,在json的數組中,再依次將數據解析出來,創建節點。
import os
import json
import sys,io
from py2neo import Graph,Nodetry:sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
except Exception:passclass MedicalGraph:def __init__(self):self.data_path = r'D:\skstudy\medical2.json'if not os.path.exists(self.data_path):raise FileNotFoundError(f"數據文件未找到: {self.data_path}")self.g = Graph('bolt://10.15.32.71:7687', auth=('neo4j', 'neo4j1234'))def read_nodes(self):diseases = [] # 疾病名drugs = [] # 藥品名departments = [] # 科室名disease_infos = [] # 疾病詳細信息rels_disease_drug = [] # 疾病-藥品rels_disease_department = [] # 疾病-科室rels_department_department = [] # 科室-科室count = 0with open(self.data_path, 'r', encoding='utf-8') as f:data_jsons= json.load(f)for data_json in data_jsons:disease_name = data_json['name']diseases.append(disease_name)disease_dict = {'name': disease_name,'recommand_drug': [],'cure_department': []}# 處理科室if 'cure_department' in data_json:cure_department = data_json['cure_department']if isinstance(cure_department, list):disease_dict['cure_department'] = cure_departmentdepartments.extend(cure_department)if len(cure_department) == 1:rels_disease_department.append([disease_name, cure_department[0]])elif len(cure_department) >= 2:rels_disease_department.append([disease_name, cure_department[1]])rels_department_department.append([cure_department[1], cure_department[0]])# 處理推薦藥物if 'recommand_drug' in data_json:recommand_drug = data_json['recommand_drug']if isinstance(recommand_drug, list):disease_dict['recommand_drug'] = recommand_drugdrugs.extend(recommand_drug)for drug in recommand_drug:rels_disease_drug.append([disease_name, drug])disease_infos.append(disease_dict)# 去重return set(diseases), set(drugs), set(departments), disease_infos, \rels_disease_drug, rels_disease_department, rels_department_departmentdef create_node(self, label, nodes):count = 0for node_name in nodes:if not node_name: # 過濾空字符串continuenode = Node(label, name=node_name)self.g.merge(node, label, 'name') # 使用 merge 避免重復創建count += 1if count % 100 == 0:print(f"{label} 節點創建: {count}/{len(nodes)}")print(f"? {label} 節點創建完成,共 {count} 個")def create_diseases_nodes(self, disease_infos):count = 0for disease_dict in disease_infos:node = Node('Disease',name=disease_dict['name'],recommand_drug=disease_dict['recommand_drug'],cure_department=disease_dict['cure_department'])self.g.merge(node, 'Disease', 'name')count += 1if count % 100 == 0:print(f"疾病節點創建: {count}")print(f"? 疾病節點創建完成,共 {count} 個")def create_graphnodes(self):diseases, drugs, departments, disease_infos, _, _, _ = self.read_nodes()self.create_diseases_nodes(disease_infos)self.create_node('Drug', drugs)self.create_node('Department', departments)def create_relationship(self, start_label, end_label, edges, rel_type, rel_name):count = 0# 去重unique_edges = list(set(["###".join(edge) for edge in edges]))total = len(unique_edges)for edge_str in unique_edges:p_name, q_name = edge_str.split('###')if not p_name or not q_name:continue# 使用參數化查詢,避免注入和引號問題query = ("MATCH (p:%s {name: $p_name}), (q:%s {name: $q_name}) ""MERGE (p)-[rel:%s {name: $rel_name}]->(q)") % (start_label, end_label, rel_type)try:self.g.run(query, p_name=p_name, q_name=q_name, rel_name=rel_name)count += 1if count % 100 == 0:print(f"{rel_name} 關系創建: {count}/{total}")except Exception as e:print(f"創建關系失敗: {e}, 邊: {p_name} -> {q_name}")print(f"? {rel_name} 關系創建完成,共 {count} 個")def create_graphrels(self):_, _, _, _, rels_disease_drug, rels_disease_department, rels_department_department = self.read_nodes()self.create_relationship('Disease', 'Drug', rels_disease_drug, 'RECOMMAND_EAT', '宜吃')self.create_relationship('Disease', 'Department', rels_disease_department, 'BELONGS_TO', '所屬科室')self.create_relationship('Department', 'Department', rels_department_department, 'BELONGS_TO', '屬于')def export_data(self):diseases, drugs, departments, _, _, _, _ = self.read_nodes()for filename, data in [('disease.txt', diseases), ('drug.txt', drugs), ('department.txt', departments)]:with open(filename, 'w', encoding='utf-8') as f:f.write('\n'.join(sorted(data)))print(f"? 已導出 {filename}")if __name__ == '__main__':medical_graph = MedicalGraph()medical_graph.create_graphnodes()medical_graph.create_graphrels()medical_graph.export_data()
運行結果
查看庫中信息
這下就可以了,數據已經存在庫中了,各自的對應關系也已經有了,那么后面就是多查詢出來的數據進行具體的操作了。
操作技巧
讀取一個 JSON 文件
假設你有一個文件 data.json
,內容如下:
{"name": "張三","age": 30,"city": "北京","hobbies": ["讀書", "游泳", "編程"],"is_student": false
}
? 讀取代碼:
import json# 打開并讀取 JSON 文件
with open('data.json', 'r', encoding='utf-8') as file:data = json.load(file)# 現在 data 是一個 Python 字典
print(data)
print("姓名:", data['name'])
print("年齡:", data['age'])
print("愛好:", data['hobbies'])
🔍 輸出結果:
{'name': '張三', 'age': 30, 'city': '北京', 'hobbies': ['讀書', '游泳', '編程'], 'is_student': False}
姓名: 張三
年齡: 30
愛好: ['讀書', '游泳', '編程']
? 處理不同類型的 JSON 文件
📌 情況 1:JSON 文件是一個數組(列表)
[{"name": "張三", "age": 30},{"name": "李四", "age": 25}
]
import jsonwith open('users.json', 'r', encoding='utf-8') as file:users = json.load(file)for user in users:print(f"姓名: {user['name']}, 年齡: {user['age']}")
📌 情況 2:JSON Lines 格式(每行一個 JSON)
每行是一個獨立的 JSON 對象,常用于大數據:
{"name": "張三", "age": 30}
{"name": "李四", "age": 25}
import jsondata_list = []with open('data.jsonl', 'r', encoding='utf-8') as file:for line in file:line = line.strip()if line:data = json.loads(line) # 注意是 json.loads()data_list.append(data)for item in data_list:print(item)
遺留問題
后面有機會再試試milvus向量庫。看看圖片是怎么操作的。