正如ETL這個概念本身所指示的,數據庫讀寫訪問是ETL的最常用甚至是最主要的操作。現代信息系統的設計與運行基本都是圍繞數據庫展開的,很多應用的核心功能都是對數據庫的CRUD(創建、檢索、更新、刪除)操作。
SmartETL框架設計之初就考慮到了這個情況,在早期就根據團隊的技術棧,實現了對MongoDB
、MySQL
、ElasticSearch
、ClickHouse
等數據庫的Extract操作(即Loader
組件)和Load操作(即Processor
組件)。具體來說,是在wikidata_filter.loader
模塊和wikidata_filter.iterator
模塊下分別創建了名為database
的子模塊,分別實現了相應的數據庫組件。
以ElasticSearch
全文索引數據庫操作為例,為了實現將數據寫入ES,即建立ES全文索引,框架提供了wikidata_filter.iterator.database.elasticsearch.ESWriter
組件,提供基于批量模式將一組JSON對象寫入ES中。代碼如下:
class ESWriter(BufferedWriter):"""數據寫入ES索引中"""def __init__(self, host="localhost",port=9200,username=None,password=None,index=None,buffer_size=1000, **kwargs):super().__init__(buffer_size=buffer_size)self.url = f"http://{host}:{port}"if password:self.auth = (username, password)else:self.auth = Noneself.index_name = indexdef write_batch(self, rows: list):header = {"Content-Type": "application/json"}lines = []for row in rows:action_row = {}for key in id_keys:if key in row:action_row["_id"] = row.pop(key)break# row_meta = json.dumps({"index": action_row})row_meta = json.dumps({"index": action_row})try:row_data = json.dumps(row)lines.append(row_meta)lines.append(row_data)except:passbody = '\n'.join(lines)body += '\n'print(f"{self.url}/{self.index_name} bulk")res = requests.post(f'{self.url}/{self.index_name}/_bulk', data=body, headers=header, auth=self.auth)if res.status_code != 200:print("Warning, ES bulk load failed:", res.text)return Falsereturn True
需要注意,為了提高寫ES的效率,ESWriter并不是直接實現JsonIterator
,而是繼承自BufferedWriter
,通過重寫write_batch
方法,基于ES的bulk
接口,實現了批量寫入ES。
類似的,為了實現從ES讀取數據,可以基于ES檢索接口(POST /{index}/_search
)或Scroll接口(POST /{index}/_search/scroll
)實現檢索Loader組件或Scroll模式的Loader組件;為了實現ES數據刪除,基于ES刪除接口(DELETE /{index}/_doc/{id}
)實現刪除Processor組件;為了判斷數據是否存在,基于ES詳情接口(GET /{index}/_doc/{id}
)實現是否存在的Processor組件;……
這種方式比較簡單直觀,也很容易實現,但是隨著應用中需要集成的數據庫種類越來越多、數據操作越來越多樣化,我們就會發現,為了實現對數據庫的訪問操作,需要針對每一類操作開發Loader組件或Processor組件,最后數據庫相關操作代碼就會分散在多個模塊、函數中,不便于組件使用、維護和擴展。
有沒有可能設計一套專門的機制,實現數據庫操作與流程節點分離,同時根據需要進行綁定?當然可以!
出于這樣的目的,本文設計了獨立的數據庫接口體系,并基于函數式組件機制實現數據庫獨立接口與流程的松耦合綁定。核心過程包括3步:
首先,定義一套數據庫操作接口Database
,包括數據庫級別的表格列表list_tables
、獲取表格元數據desc_table
等和表(集合、索引)級別的寫入upsert
、掃描scroll
、檢索search
、獲取詳情get
、是否存在exists
、刪除delete
等。這類操作可以根據業務需要就行擴展。類圖如下所示:
注意,Database
接口函數通過使用命名參數,這些參數可以通過SmartETL的YAML流程進行配置,方便傳遞特定數據庫的配置。
第二,根據實際需要的數據庫類型,實現對應的Database
類。事實上,這就是“橋接模式”的應用,將數據庫SDK提供的接口轉換為本項目的Database
接口。以下是一個ElasticSearch實現類(基于ES-HTTP接口)的示例代碼:
import json
import requests
from requests.auth import HTTPBasicAuth
from .base import Databaseid_keys = ["_id", "id", "mongo_id"]headers = {'Content-Type': 'application/json','Accept': 'application/json'
}
class ES(Database):"""讀取ES指定索引全部數據,支持提供查詢條件"""def __init__(self, host: str = "localhost",port: int = 9200,username: str = None,password: str = None,index: str = None,secure: bool = False,**kwargs):self.url = f"{'https' if secure else 'http'}://{host}:{port}"if password:self.auth = HTTPBasicAuth(username, password)else:self.auth = Noneself.index = indexdef search(self, query: dict = None,query_body: dict = None,fetch_size: int = 10,index: str = None,**kwargs):index = index or self.indexquery_body = query_body or {}if query:query_body['query'] = queryelif 'query' not in query_body:query_body['query'] = {"match_all": {}}if 'size' not in query_body:query_body['size'] = fetch_sizeprint("ES search query_body:", query_body)res = requests.post(f'{self.url}/{index}/_search', auth=self.auth, json=query_body, **kwargs)if res.status_code != 200:print("Error:", res.text)returnres = res.json()if 'hits' not in res or 'hits' not in res['hits']:print('ERROR', res)returnhits = res['hits']['hits']for hit in hits:# print(hit)doc = hit.get('_source') or {}doc['_id'] = hit['_id']doc['_score'] = hit['_score']if 'fields' in hit:doc.update(hit['fields'])yield docdef scroll(self, query: dict = None,query_body: dict = None,batch_size: int = 10,fetch_size: int = 10000,index: str = None,_scroll: str = "1m",**kwargs):index = index or self.indexquery_body = query_body or {}if query:query_body['query'] = queryelif 'query' not in query_body:query_body['query'] = {"match_all": {}}if 'size' not in query_body:query_body['size'] = batch_sizeprint("ES scroll query_body:", query_body)scroll_id = Nonetotal = 0while True:if scroll_id:# 后續請求url = f'{self.url}/_search/scroll'res = requests.post(url, auth=self.auth, json={'scroll': _scroll, 'scroll_id': scroll_id}, **kwargs)else:# 第一次請求 scrollurl = f'{self.url}/{index}/_search?scroll={_scroll}'res = requests.post(url, auth=self.auth, json=query_body, **kwargs)if res.status_code != 200:print("Error:", res.text)breakres = res.json()if 'hits' not in res or 'hits' not in res['hits']:print('ERROR', res)continueif '_scroll_id' in res:scroll_id = res['_scroll_id']hits = res['hits']['hits']for hit in hits:doc = hit.get('_source') or {}doc['_id'] = hit['_id']yield doctotal += len(hits)if len(hits) < batch_size or 0 < fetch_size <= total:breakif scroll_id:# clear scrollurl = f'{self.url}/_search/scroll'requests.delete(url, auth=self.auth, json={'scroll_id': scroll_id})def exists(self, _id, index: str = None, **kwargs):index = index or self.indexif isinstance(_id, dict):_id = _id.get("_id") or _id.get("id")url = f'{self.url}/{index}/_doc/{_id}?_source=_id'res = requests.get(url, auth=self.auth)if res.status_code == 200:return res.json().get("found") is Truereturn Falsedef delete(self, _id, index: str = None, **kwargs):index = index or self.indexif isinstance(_id, dict):_id = _id.get("_id") or _id.get("id")url = f'{self.url}/{index}/_doc/{_id}'res = requests.delete(url, auth=self.auth)return res.status_code == 200def upsert(self, items: dict or list, index: str = None, **kwargs):index = index or self.indexheader = {"Content-Type": "application/json"}if not isinstance(items, list):items = [items]lines = []for row in items:action_row = {}for key in id_keys:if key in row:action_row["_id"] = row.pop(key)breakrow_meta = json.dumps({"index": action_row})row_data = json.dumps(row)lines.append(row_meta)lines.append(row_data)body = '\n'.join(lines)body += '\n'print(f"{self.url}/{index} bulk")res = requests.post(f'{self.url}/{index}/_bulk', data=body, headers=header, auth=self.auth)if res.status_code != 200:print("Warning, ES bulk load failed:", res.text)return Falsereturn True
SmartETL
項目中根據目前業務流程需求,初步實現了ElasticSearch、ClickHouse、MongoDB、MySQL、PostgreSQL、Qdrant、SQLite、MinIO等8類數據庫,類結構圖如下所示:
第三,定義一組數據庫操作的代理函數,將流程的數據庫調用需求轉發給數據庫組件。目前定義為gestata.dbops
模塊,代碼很簡單,如下所示:
from wikidata_filter.util.database.base import Databasedef tables(db: Database, *database_list, columns: bool = False):if database_list:for database in database_list:for table in db.list_tables(database):yield {"name": table,"columns": db.desc_table(table, database) if columns else [],"database": database}else:for table in db.list_tables():yield {"name": table,"columns": db.desc_table(table) if columns else []}def search(db: Database, **kwargs):return db.search(**kwargs)def scroll(db: Database, **kwargs):return db.scroll(**kwargs)def upsert(row: dict or list, db: Database, **kwargs):
db.upsert(row, **kwargs)
return rowdef delete(_id, db: Database, **kwargs):return db.delete(_id, **kwargs)def exists(_id, db: Database, **kwargs) -> bool:return db.exists(_id, **kwargs)def get(_id, db: Database, **kwargs):return db.get(_id, **kwargs)
這里需要注意各個函數的返回值,大多數直接返回數據庫組件對應方法的執行結果,但upsert
返回的是輸入參數。這是為什么呢?這樣就能夠將數據庫寫入操作作為流程的中間節點,也就是說數據經過入庫流程,但沒有終止,而是繼續流入后續節點中,入下圖所示:
至此,我們完成了將數據庫組件與流程節點組件進行解耦的設計。下面來看一個應用案例,通過讀取arXiv數據集(來自kaggle,可參考這篇文章),寫入ElasticSearch(建立全文索引),流程定義如下:
from: local/db_envs.yamlname: load arXiv meta flow
description: 讀取arXiv-meta數據集,寫入ES
arguments: 1consts:type_mapping:all: .jsonlnodes:es: util.database.elasticsearch.ES(**es1, index='arxiv-meta-2505')select: SelectVal('data')rename: RenameFields(id='_id')change_id: "Map(lambda s: s.replace('/', ':'), key='_id')"
# 建立ES全文索引write_es: Map('gestata.dbops.upsert', es)loader: ar.Zip(arg1, 'all', type_mapping=type_mapping)
processor: Chain(select, rename, change_id, Buffer(1000), write_es, Count(label=’total-papers’)
源代碼及流程定義詳見SmartETL項目。需要注意,由于項目持續演化,目前除了Kafka
,其他數據庫組件都已經按照新的方式完成重構。
總結:本文闡述了SmartETL項目中的數據庫與流程解耦的設計,包括動機、目的、設計思路、應用案例。作為軟件設計中的一條基本原則,高內聚、松耦合是我們持續追求的目標,也只有好的設計,才能讓我們的代碼能夠易于維護與擴展,從而快速響應業務需求,降低開發成本。