SmartETL中數據庫操作與流程解耦的設計與應用

正如ETL這個概念本身所指示的,數據庫讀寫訪問是ETL的最常用甚至是最主要的操作。現代信息系統的設計與運行基本都是圍繞數據庫展開的,很多應用的核心功能都是對數據庫的CRUD(創建、檢索、更新、刪除)操作。

SmartETL框架設計之初就考慮到了這個情況,在早期就根據團隊的技術棧,實現了對MongoDBMySQLElasticSearchClickHouse等數據庫的Extract操作(即Loader組件)和Load操作(即Processor組件)。具體來說,是在wikidata_filter.loader模塊和wikidata_filter.iterator模塊下分別創建了名為database的子模塊,分別實現了相應的數據庫組件。

ElasticSearch全文索引數據庫操作為例,為了實現將數據寫入ES,即建立ES全文索引,框架提供了wikidata_filter.iterator.database.elasticsearch.ESWriter組件,提供基于批量模式將一組JSON對象寫入ES中。代碼如下:

class ESWriter(BufferedWriter):"""數據寫入ES索引中"""def __init__(self, host="localhost",port=9200,username=None,password=None,index=None,buffer_size=1000, **kwargs):super().__init__(buffer_size=buffer_size)self.url = f"http://{host}:{port}"if password:self.auth = (username, password)else:self.auth = Noneself.index_name = indexdef write_batch(self, rows: list):header = {"Content-Type":  "application/json"}lines = []for row in rows:action_row = {}for key in id_keys:if key in row:action_row["_id"] = row.pop(key)break# row_meta = json.dumps({"index": action_row})row_meta = json.dumps({"index": action_row})try:row_data = json.dumps(row)lines.append(row_meta)lines.append(row_data)except:passbody = '\n'.join(lines)body += '\n'print(f"{self.url}/{self.index_name} bulk")res = requests.post(f'{self.url}/{self.index_name}/_bulk', data=body, headers=header, auth=self.auth)if res.status_code != 200:print("Warning, ES bulk load failed:", res.text)return Falsereturn True

需要注意,為了提高寫ES的效率,ESWriter并不是直接實現JsonIterator,而是繼承自BufferedWriter,通過重寫write_batch方法,基于ES的bulk接口,實現了批量寫入ES。

類似的,為了實現從ES讀取數據,可以基于ES檢索接口(POST /{index}/_search)或Scroll接口(POST /{index}/_search/scroll)實現檢索Loader組件或Scroll模式的Loader組件;為了實現ES數據刪除,基于ES刪除接口(DELETE /{index}/_doc/{id})實現刪除Processor組件;為了判斷數據是否存在,基于ES詳情接口(GET /{index}/_doc/{id})實現是否存在的Processor組件;……

這種方式比較簡單直觀,也很容易實現,但是隨著應用中需要集成的數據庫種類越來越多、數據操作越來越多樣化,我們就會發現,為了實現對數據庫的訪問操作,需要針對每一類操作開發Loader組件或Processor組件,最后數據庫相關操作代碼就會分散在多個模塊、函數中,不便于組件使用、維護和擴展。

有沒有可能設計一套專門的機制,實現數據庫操作與流程節點分離,同時根據需要進行綁定?當然可以!

出于這樣的目的,本文設計了獨立的數據庫接口體系,并基于函數式組件機制實現數據庫獨立接口與流程的松耦合綁定。核心過程包括3步:

首先,定義一套數據庫操作接口Database,包括數據庫級別的表格列表list_tables、獲取表格元數據desc_table等和表(集合、索引)級別的寫入upsert、掃描scroll、檢索search、獲取詳情get、是否存在exists、刪除delete等。這類操作可以根據業務需要就行擴展。類圖如下所示:
數據庫接口類圖
注意,Database接口函數通過使用命名參數,這些參數可以通過SmartETLYAML流程進行配置,方便傳遞特定數據庫的配置。

第二,根據實際需要的數據庫類型,實現對應的Database類。事實上,這就是“橋接模式”的應用,將數據庫SDK提供的接口轉換為本項目的Database接口。以下是一個ElasticSearch實現類(基于ES-HTTP接口)的示例代碼:

import json
import requests
from requests.auth import HTTPBasicAuth
from .base import Databaseid_keys = ["_id", "id", "mongo_id"]headers = {'Content-Type': 'application/json','Accept': 'application/json'
}
class ES(Database):"""讀取ES指定索引全部數據,支持提供查詢條件"""def __init__(self, host: str = "localhost",port: int = 9200,username: str = None,password: str = None,index: str = None,secure: bool = False,**kwargs):self.url = f"{'https' if secure else 'http'}://{host}:{port}"if password:self.auth = HTTPBasicAuth(username, password)else:self.auth = Noneself.index = indexdef search(self, query: dict = None,query_body: dict = None,fetch_size: int = 10,index: str = None,**kwargs):index = index or self.indexquery_body = query_body or {}if query:query_body['query'] = queryelif 'query' not in query_body:query_body['query'] = {"match_all": {}}if 'size' not in query_body:query_body['size'] = fetch_sizeprint("ES search query_body:", query_body)res = requests.post(f'{self.url}/{index}/_search', auth=self.auth, json=query_body, **kwargs)if res.status_code != 200:print("Error:", res.text)returnres = res.json()if 'hits' not in res or 'hits' not in res['hits']:print('ERROR', res)returnhits = res['hits']['hits']for hit in hits:# print(hit)doc = hit.get('_source') or {}doc['_id'] = hit['_id']doc['_score'] = hit['_score']if 'fields' in hit:doc.update(hit['fields'])yield docdef scroll(self, query: dict = None,query_body: dict = None,batch_size: int = 10,fetch_size: int = 10000,index: str = None,_scroll: str = "1m",**kwargs):index = index or self.indexquery_body = query_body or {}if query:query_body['query'] = queryelif 'query' not in query_body:query_body['query'] = {"match_all": {}}if 'size' not in query_body:query_body['size'] = batch_sizeprint("ES scroll query_body:", query_body)scroll_id = Nonetotal = 0while True:if scroll_id:# 后續請求url = f'{self.url}/_search/scroll'res = requests.post(url, auth=self.auth, json={'scroll': _scroll, 'scroll_id': scroll_id}, **kwargs)else:# 第一次請求 scrollurl = f'{self.url}/{index}/_search?scroll={_scroll}'res = requests.post(url, auth=self.auth, json=query_body, **kwargs)if res.status_code != 200:print("Error:", res.text)breakres = res.json()if 'hits' not in res or 'hits' not in res['hits']:print('ERROR', res)continueif '_scroll_id' in res:scroll_id = res['_scroll_id']hits = res['hits']['hits']for hit in hits:doc = hit.get('_source') or {}doc['_id'] = hit['_id']yield doctotal += len(hits)if len(hits) < batch_size or 0 < fetch_size <= total:breakif scroll_id:# clear scrollurl = f'{self.url}/_search/scroll'requests.delete(url, auth=self.auth, json={'scroll_id': scroll_id})def exists(self, _id, index: str = None, **kwargs):index = index or self.indexif isinstance(_id, dict):_id = _id.get("_id") or _id.get("id")url = f'{self.url}/{index}/_doc/{_id}?_source=_id'res = requests.get(url, auth=self.auth)if res.status_code == 200:return res.json().get("found") is Truereturn Falsedef delete(self, _id, index: str = None, **kwargs):index = index or self.indexif isinstance(_id, dict):_id = _id.get("_id") or _id.get("id")url = f'{self.url}/{index}/_doc/{_id}'res = requests.delete(url, auth=self.auth)return res.status_code == 200def upsert(self, items: dict or list, index: str = None, **kwargs):index = index or self.indexheader = {"Content-Type": "application/json"}if not isinstance(items, list):items = [items]lines = []for row in items:action_row = {}for key in id_keys:if key in row:action_row["_id"] = row.pop(key)breakrow_meta = json.dumps({"index": action_row})row_data = json.dumps(row)lines.append(row_meta)lines.append(row_data)body = '\n'.join(lines)body += '\n'print(f"{self.url}/{index} bulk")res = requests.post(f'{self.url}/{index}/_bulk', data=body, headers=header, auth=self.auth)if res.status_code != 200:print("Warning, ES bulk load failed:", res.text)return Falsereturn True

SmartETL項目中根據目前業務流程需求,初步實現了ElasticSearchClickHouseMongoDBMySQLPostgreSQLQdrantSQLiteMinIO等8類數據庫,類結構圖如下所示:
數據庫類圖結構
第三,定義一組數據庫操作的代理函數,將流程的數據庫調用需求轉發給數據庫組件。目前定義為gestata.dbops模塊,代碼很簡單,如下所示:

from wikidata_filter.util.database.base import Databasedef tables(db: Database, *database_list, columns: bool = False):if database_list:for database in database_list:for table in db.list_tables(database):yield {"name": table,"columns": db.desc_table(table, database) if columns else [],"database": database}else:for table in db.list_tables():yield {"name": table,"columns": db.desc_table(table) if columns else []}def search(db: Database, **kwargs):return db.search(**kwargs)def scroll(db: Database, **kwargs):return db.scroll(**kwargs)def upsert(row: dict or list, db: Database, **kwargs):
db.upsert(row, **kwargs)
return rowdef delete(_id, db: Database, **kwargs):return db.delete(_id, **kwargs)def exists(_id, db: Database, **kwargs) -> bool:return db.exists(_id, **kwargs)def get(_id, db: Database, **kwargs):return db.get(_id, **kwargs)

這里需要注意各個函數的返回值,大多數直接返回數據庫組件對應方法的執行結果,但upsert返回的是輸入參數。這是為什么呢?這樣就能夠將數據庫寫入操作作為流程的中間節點,也就是說數據經過入庫流程,但沒有終止,而是繼續流入后續節點中,入下圖所示:
入庫流程示意
至此,我們完成了將數據庫組件與流程節點組件進行解耦的設計。下面來看一個應用案例,通過讀取arXiv數據集(來自kaggle,可參考這篇文章),寫入ElasticSearch(建立全文索引),流程定義如下:

from: local/db_envs.yamlname: load arXiv meta flow
description: 讀取arXiv-meta數據集,寫入ES
arguments: 1consts:type_mapping:all: .jsonlnodes:es: util.database.elasticsearch.ES(**es1, index='arxiv-meta-2505')select: SelectVal('data')rename: RenameFields(id='_id')change_id: "Map(lambda s: s.replace('/', ':'), key='_id')"
# 建立ES全文索引write_es: Map('gestata.dbops.upsert', es)loader: ar.Zip(arg1, 'all', type_mapping=type_mapping)
processor: Chain(select, rename, change_id, Buffer(1000), write_es, Count(label=’total-papers’)

源代碼及流程定義詳見SmartETL項目。需要注意,由于項目持續演化,目前除了Kafka,其他數據庫組件都已經按照新的方式完成重構。

總結:本文闡述了SmartETL項目中的數據庫與流程解耦的設計,包括動機、目的、設計思路、應用案例。作為軟件設計中的一條基本原則,高內聚、松耦合是我們持續追求的目標,也只有好的設計,才能讓我們的代碼能夠易于維護與擴展,從而快速響應業務需求,降低開發成本。

本文來自互聯網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。
如若轉載,請注明出處:http://www.pswp.cn/news/912215.shtml
繁體地址,請注明出處:http://hk.pswp.cn/news/912215.shtml
英文地址,請注明出處:http://en.pswp.cn/news/912215.shtml

如若內容造成侵權/違法違規/事實不符,請聯系多彩編程網進行投訴反饋email:809451989@qq.com,一經查實,立即刪除!

相關文章

【記錄解決問題】activiti--sql 轉義符設置

一、背景 %、&#xff01;、_在sql查詢時需要轉義&#xff0c;轉義的語法 like %?2% escape ?#{escapeCharacter()}二、activiti轉義配置 String wildcardEscapeClause ""; if (this.databaseWildcardEscapeCharacter ! null && this.databaseWildcard…

Unity AR構建維護系統的以AI驅動增強現實知識檢索系統

本博客概述了為維護開發的AI驅動增強現實&#xff08;AR&#xff09;知識檢索系統的開發過程&#xff0c;該系統集成了Unity用于AR、Python服務器用于后端處理&#xff0c;以及ChatGPT用于自然語言處理。該系統允許維護工人通過AR設備&#xff08;如HoloLens 2&#xff09;查詢…

Java面向對象核心:方法值傳遞與封裝機制精講

文章目錄 Java面向對象編程核心筆記一、方法值傳遞機制1. 基本數據類型傳遞2. 引用數據類型傳遞值傳遞總結 二、面向對象核心概念1. 類與對象關系2. 類定義規范3. 對象創建與使用 三、封裝機制詳解1. 封裝三大要素2. 封裝示例&#xff08;GirlFriend類&#xff09;3. 測試類4. …

【Actix Web】構建高性能 Rust API:Actix Web 最佳實踐與進階指南

目錄 一、高性能 API 架構設計1.1 系統架構圖1.2 核心組件 二、項目初始化與配置2.1 創建項目2.2 添加依賴 (Cargo.toml)2.3 配置文件 (config/default.toml) 三、核心模塊實現3.1 應用狀態管理 (src/state.rs)3.2 數據模型定義 (src/models.rs) 四、認證與授權系統4.1 JWT 認證…

vue項目中純前端實現導出pdf文件,不需要后端處理。

在 Vue 項目中&#xff0c;純前端實現導出 PDF 文件是完全可行的。通常可以借助一些 JavaScript 庫來將 HTML 內容或 DOM 元素轉換為 PDF 并下載&#xff0c;無需后端參與。 下面介紹幾種常用的方案和實現方法&#xff1a; 推薦方案&#xff1a;使用 html2canvas jsPDF 安裝…

c++虛擬內存

常見的內存困惑 當你編寫C程序時&#xff0c;是否遇到過&#xff1a; vector申請200MB內存&#xff0c;但系統顯示只占用20MB&#xff1f;程序在低配機器上崩潰&#xff0c;報出std::bad_alloc但內存顯示充裕&#xff1f;遍歷數組時特定位置耗時突然增加&#xff1f;相同代碼…

領域驅動設計(DDD)【22】之限定建模技術

文章目錄 一 限定初識二 限定識別三 限定實現 一 限定初識 一個 員工 可以擁有多份 工作經驗&#xff0c;而各個 工作經驗 的 時間段 不能相互重疊。可以得出一個推論&#xff1a;對于一個 員工 而言&#xff0c;每個 時間段 只能有一條 工作經驗。 UML中第二種表述方式&…

《P6492 [COCI 2010/2011 #6] STEP》

題目描述 給定一個長度為 n 的字符序列 a&#xff0c;初始時序列中全部都是字符 L。 有 q 次修改&#xff0c;每次給定一個 x&#xff0c;若 ax? 為 L&#xff0c;則將 ax? 修改成 R&#xff0c;否則將 ax? 修改成 L。 對于一個只含字符 L&#xff0c;R 的字符串 s&#…

macOS,切換 space 失效,向右切換space(move right a space) 失效

背景 準確來講&#xff0c;遇到的問題是向右切換space&#xff08;move right a space) 失效&#xff0c;并向左是成功的。 在鍵盤-快捷鍵-調度中心中&#xff0c;所有的快捷鍵均可用&#xff0c;但是“向右移動一個空間”總是失效。 已經檢查過不是快捷鍵沖突的問題&#x…

網飛貓官網入口 - 免費高清影視平臺,Netflix一站觀看

網飛貓是一個專注于提供豐富影視資源的在線平臺&#xff0c;涵蓋國內外熱門電影、電視劇、動漫、綜藝等多種類型。它不僅整合了Netflix的獨家內容&#xff0c;還提供了大量高清、藍光畫質的影視作品&#xff0c;支持多語言字幕&#xff0c;滿足不同用戶的觀影需求。網飛貓的界面…

Hyper-v-中的FnOs--飛牛Nas虛擬磁盤擴容(不清除數據)

在Hyper-v下的飛牛Nas要怎么在不刪除原有虛擬磁盤數據的情況下擴容呢 OK下面開始教學&#xff08;適用于Basic模式的虛擬磁盤擴容&#xff0c;Linear沒試過&#xff09; 先關閉飛牛Nas系統 找到飛牛Nas虛擬機&#xff0c;在設置下SCSI控制器找到要擴容的虛擬磁盤&#xff0c; 點…

掌握 MySQL 的基石:全面解讀數據類型及其影響

前言 上篇文章小編講述了關于MySQL表的DDL操作&#xff0c;在那里我多次使用了MySQL的數據類型&#xff0c;但是我并沒有去講述MySQL的數據類型&#xff0c;想必各位讀者已經很好奇MySQL的數據類型都有什么了&#xff0c;今天這篇文章我將會詳細的去講述MySQL的數據類型&#x…

buildadmin 如何制作自己的插件

官方文檔指引 提示&#xff1a;若不計劃發布到應用市場&#xff0c;可省略圖片等非必要功能 參考文檔&#xff1a;https://doc.buildadmin.com/senior/module/basicInfo.html 目錄 官方文檔指引開發說明模塊開發流程模塊包結構示例安裝開發工具 總結 開發說明 目標&#xff…

【數據標注師】關鍵點標注

目錄 一、 **關鍵點標注的四大核心原則**二、 **五階能力培養體系**? **階段1&#xff1a;基礎認知筑基&#xff08;1-2周&#xff09;**? **階段2&#xff1a;復雜場景處理技能? **階段3&#xff1a;三維空間標注&#xff08;進階&#xff09;**? **階段4&#xff1a;效率…

創建網站的基本步驟?如何建設自己的網站?

創建網站是一個系統化的過程&#xff0c;涵蓋規劃、設計、開發、測試和發布等多個階段。以下是詳細步驟及關鍵工具推薦&#xff1a; 一、規劃階段&#xff1a;明確目標與內容 定義目標 1、確定網站目的&#xff08;展示信息、銷售、博客、服務等&#xff09;。 2、分析目標…

FreeSWITCH配置文件解析(2) dialplan 撥號計劃中xml 的action解析

在 FreeSWITCH 的撥號計劃&#xff08;Dialplan&#xff09;中&#xff0c;使用 XML 配置。其中&#xff0c;<action> 標簽用于指定要執行的操作。這些操作通常是應用程序&#xff08;applications&#xff09;或設置變量等。下面列出常見的 <action> 類型及其含義…

MCPA2APPT:基于 A2A+MCP+ADK 的多智能體流式并發高質量 PPT 智能生成系統

&#x1f680; MCPA2APPT / MultiAgentPPT 集成 A2A MCP ADK 架構的智能化演示文稿生成系統&#xff0c;支持多智能體協作與流式并發&#xff0c;實時生成高質量 PPT 內容。 &#x1f9e0; 項目簡介 MultiAgentPPT&#xff08;又名 MCPA2APPT&#xff09;采用 A2A&#xff…

Maven 多模塊項目調試與問題排查總結

&#x1f9d1; 博主簡介&#xff1a;CSDN博客專家&#xff0c;歷代文學網&#xff08;PC端可以訪問&#xff1a;https://literature.sinhy.com/#/?__c1000&#xff0c;移動端可微信小程序搜索“歷代文學”&#xff09;總架構師&#xff0c;15年工作經驗&#xff0c;精通Java編…

debian國內安裝docker

先升級apt和安裝依賴包 apt update apt upgrade apt install curl vim wget gnupg dpkg apt-transport-https lsb-release ca-certificates添加存儲庫的GPG密鑰&#xff08;阿里云&#xff09; curl -fsSL https://mirrors.aliyun.com/docker-ce/linux/debian/gpg | sudo gpg…

vue網頁中的一個天氣組件使用高德api

今天寫了一個天氣組件效果如下&#xff1a; 實現代碼如下&#xff1a; <template><div><span click"getLocation" style"cursor: pointer"><span style"color:white;">{{ weatherInfo.area }}</span></span&g…