上一篇 介紹了數據接入處理的整體方案設計。本篇介紹基于SmartETL框架的流程實現。
5. 流程開發
5.1.簡單采集流程
從指定時間(yy年 mm月)開始,持續采集arXiv論文。基于月份和順序號,構造論文ID,進而下載論文PDF文件,保存到本地。
5.1.1.Loader設計
定制開發一個新的Loader“web.ArXivTaskEmit”,基于月份和順序號構造并輸出論文ID。每個月的順序號從00001到29999。輸出格式樣例:2503.00001(字符串)。代碼如下:
import time
import os
import jsonfrom wikidata_filter.util.dates import current_date
from wikidata_filter.loader.base import DataProviderclass ArXivTaskEmit(DataProvider):ts_file = ".arxiv.ts"def __init__(self, start_month: int = None, end_month: int = None):self.month = start_month or 2501self.seq = 1if os.path.exists(self.ts_file):with open(self.ts_file, encoding="utf8") as fin:nums = json.load(fin)self.month = nums[0]self.seq = nums[1]self.end_month = end_month or int(current_date('%y%m'))print(f"from {self.month}.{self.seq} to {self.end_month}")def write_ts(self):row = [self.month, self.seq]with open(self.ts_file, "w", encoding="utf8") as out:json.dump(row, out)def iter(self):while self.month <= self.end_month:while self.seq < 30000:new_id = f'{self.month}.{self.seq:05d}'print("processing:", new_id)yield new_idself.seq += 1self.write_ts()time.sleep(3)self.month += 1if self.month % 100 > 12:self.month += int(self.month/100)*100 + 101self.seq = 1self.write_ts()
5.1.2.Processor設計
處理過程包括:
- 將輸入數據包裝為dict結構。基于內置
ToDict
轉換算子。轉換后,數據變成:{‘d’: ‘2503.00001’} - 構造PDF文件URL。基于內置
ConcatFields
轉換算子。轉換后,數據變成:{‘d’: 2503.00001’, ‘url_pdf’: ‘https://arxiv.org/pdf/2503.00001’} - 構造保存本地的文件名。基于內置
ConcatFields
轉換算子。轉換后,數據變成:{‘d’: 2503.00001’, ‘url_pdf’: ‘https://arxiv.org/pdf/2503.00001’, ‘filename’: ‘data/arxiv/2503.00001.pdf’} - 判斷文件不存在。基于內置
Not
算子,組合util.files.exists
函數。 - 文件下載。基于內置
Map
算子,組合util.http.content
函數,將獲取的文件內容(bytes
類型)作為字段content
的值。 - 保存文件。基于內置
SaveFiles
算子。
上述流程SmartETL的YAML流程定義語法,基于已有的組件進行實例化,并通過Chain
進行組合,形成順序處理流程,完成文件持續下載。
5.1.3.流程定義
name: arXiv簡單采集流程
description: 對指定時間范圍arXiv論文進行采集loader: web.ArXivTaskEmitnodes:as_dict: ToDictmake_url: ConcatFields('url_pdf', 'd', prefix=’https://arxiv.org/pdf/’)make_filename: ConcatFields('filename', 'd', prefix='data/arxiv/', suffix='.pdf')file_not_exists: Not('util.files.exists', key='filename')download: Map('util.http.content', key='url_pdf', target_key='content', most_times=3, ignore_error=True)save_file: WriteFiles('data/arxiv', name_key=’filename’)processor: Chain(as_dict, make_url, make_filename, file_not_exists, download, save_file)
5.2.論文搜索&HTML采集流程
針對指定關鍵詞,利用arXiv API搜索論文,下載論文HTML頁面。
5.2.1.Loader設計
通過命令行的交互輸入(input
函數)獲取關鍵詞作為任務輸入。
5.2.2.Processor設計
處理過程包括:
- 通過命令行獲取用戶輸入關鍵詞。輸出數據為字符串,例如“RAG”。
- 基于輸入關鍵詞進行檢索,基于arXiv官網搜索API進行檢索。由于檢索結果為XML格式,基于
xmltodict
庫轉換為JSON格式,對論文結構附加url_pdf
、url_html
字段,表示PDF和HTML的下載鏈接。這一步采用定制開發函數gestata.arxiv.search
,配合Map
算子進行處理。 - 【可選的】數組打散。如果前一步
search
函數返回為數組結構,則需要通過Flat
組件進行打散;如果為生成器yield
方式返回,則框架自行處理。 - 下載HTML。內置
util.http.content
函數提供http請求(基于requests
庫),結合Map
算子,實現arxiv HTML文件下載,將獲取的文件內容(bytes)作為字段content
的值。 - 過濾。判斷
content
是否為空,如果網絡連接失敗或者HTML文件本身不存在,就可以終止。 - 保存HTML。將前一步獲取的HTML內容(即
content
字段值)保存為文件。
搜索核心代碼如下:
ARXIV_API_BASE = "http://export.arxiv.org/api/query"
ARXIV_BASE = "http://arxiv.org"def search(topic: str, max_results: int = 50):"""基于arXiv API的論文搜索"""if ':' not in topic:topic = 'all:' + topicparams = {"search_query": topic,"max_results": max_results,"sortBy": "lastUpdatedDate","sortOrder": "descending"}res = requests.get(ARXIV_API_BASE, params=params)doc = xmltodict.parse(res.text)feed = doc.get("feed")if "entry" not in feed:return []papers = feed.get("entry")for paper in papers:_id = paper["id"]_id = _id[_id.rfind('/')+1:]paper["_id"] = _idpaper["url_pdf"] = f"{ARXIV_BASE}/pdf/{_id}"paper["url_html"] = f"{ARXIV_BASE}/html/{_id}"return papers
在SmartETL框架最新設計中,通過將函數動態綁定為處理節點,只需要將業務處理邏輯實現為一個函數,就能夠在流程中進行調用,極大方便了數據處理算子開發流程。
5.2.3.流程定義
name: arXiv搜索采集流程
description: 基于用戶提供的關鍵詞進行arXiv論文并下載loader: Input('請輸入arXiv論文搜索關鍵詞:')nodes:search: Map('gestata.arxiv.search', max_results=10)download: Map('util.http.content', key='url_html', target_key='content', most_times=3, ignore_error=True)filter: FieldsNonEmpty('content')save: WriteFiles('data/arxiv', name_key='_id', suffix='.html')processor: Chain(search, Flat(), download, filter, save)
5.3.HTML解析建索引流程
對下載的HTML頁面進行解析,并對論文摘要和正文建立向量化索引。
5.3.1.HTML解析
由于arXiv論文的HTML頁面具有特定的網頁結構,通過lxml或beaultifulsoup庫,可實現對論文標題、作者、章節(包括其中的圖片、表格、公示等)列表、附錄、參考文獻等信息的精準識別。
5.3.2.Loader設計
可直接復用前文所述的自動生成論文ID的“web.ArXivTaskEmit”,也可以采用基于關鍵詞搜索的Input
+gestata.arxiv.search
。
5.3.3.Processor設計
處理過程包括:
- HTML解析。設計函數組件
gestata.arxiv.extract
,從輸入的HTML(str
類型)中進行論文結構抽取。注意,為了對HTML中的相對鏈接進行絕對定位,解析HTML時需要提供該HTML的基路徑(base path)。此外,設計的函數考慮到可能在其他地方調用,建議提供相關參數配置。 - 論文摘要索引。論文摘要是一個單獨的字段,并且通常長度較短,符合向量化索引chunk的長度要求,可以通過以下兩步建立索引:
2.1 向量化。通過內置的model.embed.Local
組件,調用embedding模型,生成摘要的向量
2.2 寫向量庫。利用提前初始化的Qdrant
組件,基于內置的DatabaseWriter
組件進行寫入。 - 論文正文索引。論文包括多個章節(section),并且篇幅較長,通常需要進行chunk化拆分。一種簡單的方式先按section拆分(避免不同章節的內容成為一個chunk),然后根據段落和句子邊界進行長度拆分。過程如下:
3.1 對sections進行打散,即轉成每個section作為一條記錄輸出。
3.2 對每個section的content
字段進行chunk化拆分并打散
3.3 向量化。同上。
3.4 寫向量庫。同上。
5.3.4.流程定義
name: arXiv論文HTML解析與索引流程
description: 對論文HTML進行解析并建立摘要和正文的向量索引loader: Directory('data/arxiv', '.html')nodes:qd: util.database.qdrant.Qdrant(**qdrant)select: SelectVal('data')extract: Map('gestata.arxiv.extract_from_html')vector1: model.embed.Local(api_base=bge_large_en, key='abstract', target_key='vector')write1: DatabaseWriter(qd, buffer_size=1, collection='chunk_abstract')chain1: Chain(Select('abstract'), vector1, write1)flat: FlatProperty('sections', inherit_props=True)select2: Select('content')chunk: Map('util.split.simple', key='content',target_key='chunks')flat_chunk: Flat(key='chunks')vector2: model.embed.Local(api_base=bge_large_en, key='chunks', target_key='vector')write2: DatabaseWriter(qd, buffer_size=1, collection='chunk_content')chain2: Chain(Select('sections', flat, select2, chunk, flat_chunk, vector2, write2)processor: Chain(select, extract, Fork(chain1, chain2, copy_data=True))