SmartETL框架主要采用了面向對象的設計思想,將ETL過程中的處理邏輯抽象為Loader
和Processor
(對應loader
模塊和iterator
模塊),所有流程組件需要繼承或實現DataProvider
(iter方法)或JsonIterator
(on_data
或__process__
方法)。
例如以下代碼實現將論文結構中的摘要和正文拼接為一個字符串字段,方便后續對論文建立全文索引。
class ConcatPaperContent(JsonIterator):"""arxiv html頁面數據處理類"""def on_data(self, data: Any, *args):paper = data['paper']content = ""if paper:abstract = paper.get('abstract')content += f"{abstract}\n"sections = paper.get('sections')for section in sections:content += f"{section['content']}\n"data['content'] = contentreturn data
然而,業務中很多處理邏輯比較簡單,以往開發時用少數幾行代碼就可以搞定,而在SmartETL框架中,則必須實現一個類,正如上面的例子所示。雖然SmartETL支持加載外部包的組件(只要在sys.path
中),但如果是需要定制開發則相對繁瑣。
此前,在過濾組件(Filter
)中考慮到這種情況,解決辦法是在流程中定義Lambda表達式。例如以下流程定義中,filter節點通過Lambda表達式abnormal_time
實現過濾publish_time
字段值小于當前時間的記錄的功能,即,對于經過filter節點的記錄,僅當其publish_time
字段值大于等于當前時間current
時才會輸出給后續節點。
nodes:current: util.dates.current_ts(True)abnormal_time: "=lambda t, current=current: t >= current "filter: Filter(abnormal_time, key='publish_time')
為了簡化業務代碼編寫,SmartETL新增實現函數式組件,即以函數形式提供核心處理邏輯,而不需要封裝成類。Lambda表達式就是一種特殊的函數。
跟C/C++、Java不同,Python語言中函數是一等公民,即開發者可以直接訪問和操作函數,支持將函數作為一個對象進行加載、傳遞和管理,這對于開發一些高級功能,提高程序擴展性非常方便。
SmartETL函數式組件是指將任意編寫的數據處理函數作為ETL流程組件,加入到流程處理中。唯一的限制是:除了作為Loader
組件的函數外(框架無法提供輸入),函數應該以流程數據作為輸入參數,并將需要向后續流程傳遞的數據作為輸出參數。以下表格說明了函數的參數與節點類型作用的對應關系:
節點類型 | 是否支持輸入 | 是否要求有輸出 |
---|---|---|
Loader節點 | 否(可通過配置提供) | 是 |
Processor節點 | 是(流程數據作為第一個參數) | 均可 |
為了使用函數對象,框架設計了函數式Loader組件Function
如下:
class Function(DataProvider):"""函數調用包裝器 提供調用函數的結果"""def __init__(self, function, *args, **kwargs):""":param function 函數對象或函數對象的完整限定名(如wikidata_filter.util.files.get_lines)"""assert function is not None, "function is None!"if isinstance(function, str):from wikidata_filter.util.mod_util import load_clsfunction = load_cls(function)[0]self.function = functionself.args = argsself.kwargs = kwargsdef iter(self):"""DataProvider的主要API,對提供函數進行調用"""# 注意,使用了組件構造參數res = self.function(*self.args, **self.kwargs)if isinstance(res, GeneratorType):for item in res:yield itemelse:yield res
類似的,框架實現了Function(JsonIterator)
。常用的Map
組件也支持提供函數對象或函數對象完整限定名。
基于函數式組件對本文開頭的示例進行改寫,代碼如下:
def concat_paper_content(paper: dict):paper = paper or {}abstract = paper.get('abstract')content = f"{abstract}\n"sections = paper.get('sections')for section in sections:content += f"{section['content']}\n"return content
在yaml流程中進行引用,如下所示:
nodes:concat: Map('gestata.arxiv.concat_paper_content')
或者:
nodes:concat: Function('wikidata_filter.gestata.arxiv.concat_paper_content')
流程說明:通過yaml流程文件,將concat_content函數與Map
進行綁定(假設該函數定義在wikidata_filter.gestata.arxiv
模塊中),實現對基于paper
的處理,并將函數調用返回值作為content
字段值。
注意,為了支持Function
使用自定義組件(可能在任意sys.path
可訪問模塊),需要提供完整的函數對象限定名,本示例中包括頂層模塊wikidata_filter
。
那么,Map
與Function
有什么區別呢?主要區別是Map主要是為了支持wikidata_filter.gestata
和wikidata_filter.util
模塊中定義的函數,且支持指定要處理的字段(通過key
參數)和目標字段(通過target_key
參數)。
從示例中可以看出,使用函數式組件至少有幾點好處:
- 代碼更簡潔:只需要實現一個提供核心處理邏輯的函數即可。
- 配置更加靈活:通過流程指定輸入字段和輸出字段,可以靈活適配不同業務數據。
- 復用性更好:可以通過代碼或yaml配置進行復用。
在此前arXiv論文數據處理應用流程中,大量采用了函數式組件。具體可查看https://github.com/ictchenbo/SmartETL/blob/main/wikidata_filter/gestata/arxiv.py了解詳情。